diff --git a/src/learn/gensfen.cpp b/src/learn/gensfen.cpp index 1dddac5a..4accb882 100644 --- a/src/learn/gensfen.cpp +++ b/src/learn/gensfen.cpp @@ -1,7 +1,7 @@ #include "gensfen.h" +#include "sfen_writer.h" #include "packed_sfen.h" -#include "sfen_stream.h" #include "misc.h" #include "position.h" @@ -16,6 +16,7 @@ #include "syzygy/tbprobe.h" +#include #include #include #include @@ -28,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -37,188 +37,6 @@ using namespace std; namespace Learner { - // Helper class for exporting Sfen - struct SfenWriter - { - // Amount of sfens required to flush the buffer. - static constexpr size_t SFEN_WRITE_SIZE = 5000; - - // File name to write and number of threads to create - SfenWriter(string filename_, int thread_num, uint64_t save_count, SfenOutputType sfen_output_type) - { - sfen_buffers_pool.reserve((size_t)thread_num * 10); - sfen_buffers.resize(thread_num); - - auto out = sync_region_cout.new_region(); - out << "INFO (sfen_writer): Creating new data file at " << filename_ << endl; - - sfen_format = sfen_output_type; - output_file_stream = create_new_sfen_output(filename_, sfen_format); - filename = filename_; - save_every = save_count; - - finished = false; - - file_worker_thread = std::thread([&] { this->file_write_worker(); }); - } - - ~SfenWriter() - { - flush(); - - finished = true; - file_worker_thread.join(); - output_file_stream.reset(); - -#if !defined(NDEBUG) - { - // All buffers should be empty since file_worker_thread - // should have written everything before exiting. - for (const auto& p : sfen_buffers) { assert(p == nullptr); (void)p ; } - assert(sfen_buffers_pool.empty()); - } -#endif - } - - void write(size_t thread_id, const PackedSfenValue& psv) - { - // We have a buffer for each thread and add it there. - // If the buffer overflows, write it to a file. - - // This buffer is prepared for each thread. - auto& buf = sfen_buffers[thread_id]; - - // Secure since there is no buf at the first time - // and immediately after writing the thread buffer. - if (!buf) - { - buf = std::make_unique(); - buf->reserve(SFEN_WRITE_SIZE); - } - - // Buffer is exclusive to this thread. - // There is no need for a critical section. - buf->push_back(psv); - - if (buf->size() >= SFEN_WRITE_SIZE) - { - // If you load it in sfen_buffers_pool, the worker will do the rest. - - // Critical section since sfen_buffers_pool is shared among threads. - std::unique_lock lk(mutex); - sfen_buffers_pool.emplace_back(std::move(buf)); - } - } - - void flush() - { - for (size_t i = 0; i < sfen_buffers.size(); ++i) - { - flush(i); - } - } - - // Move what remains in the buffer for your thread to a buffer for writing to a file. - void flush(size_t thread_id) - { - std::unique_lock lk(mutex); - - auto& buf = sfen_buffers[thread_id]; - - // There is a case that buf==nullptr, so that check is necessary. - if (buf && buf->size() != 0) - { - sfen_buffers_pool.emplace_back(std::move(buf)); - } - } - - // Dedicated thread to write to file - void file_write_worker() - { - while (!finished || sfen_buffers_pool.size()) - { - vector> buffers; - { - std::unique_lock lk(mutex); - - // Atomically swap take the filled buffers and - // create a new buffer pool for threads to fill. - buffers = std::move(sfen_buffers_pool); - sfen_buffers_pool = std::vector>(); - } - - if (!buffers.size()) - { - // Poor man's condition variable. - sleep(100); - } - else - { - for (auto& buf : buffers) - { - output_file_stream->write(*buf); - - sfen_write_count += buf->size(); - - // Add the processed number here, and if it exceeds save_every, - // change the file name and reset this counter. - sfen_write_count_current_file += buf->size(); - if (sfen_write_count_current_file >= save_every) - { - sfen_write_count_current_file = 0; - - // Sequential number attached to the file - int n = (int)(sfen_write_count / save_every); - - // Rename the file and open it again. - // Add ios::app in consideration of overwriting. - // (Depending on the operation, it may not be necessary.) - string new_filename = filename + "_" + std::to_string(n); - output_file_stream = create_new_sfen_output(new_filename, sfen_format); - - auto out = sync_region_cout.new_region(); - out << "INFO (sfen_writer): Creating new data file at " << new_filename << endl; - } - } - } - } - } - - private: - - std::unique_ptr output_file_stream; - - // A new net is saved after every save_every sfens are processed. - uint64_t save_every = std::numeric_limits::max(); - - // File name passed in the constructor - std::string filename; - - // Thread to write to the file - std::thread file_worker_thread; - - // Flag that all threads have finished - atomic finished; - - SfenOutputType sfen_format; - - // buffer before writing to file - // sfen_buffers is the buffer for each thread - // sfen_buffers_pool is a buffer for writing. - // After loading the phase in the former buffer by SFEN_WRITE_SIZE, - // transfer it to the latter. - std::vector> sfen_buffers; - std::vector> sfen_buffers_pool; - - // Mutex required to access sfen_buffers_pool - std::mutex mutex; - - // Number of sfens written in total, and the - // number of sfens written in the current file. - uint64_t sfen_write_count = 0; - uint64_t sfen_write_count_current_file = 0; - }; - // Class to generate sfen with multiple threads struct Gensfen { diff --git a/src/learn/sfen_writer.h b/src/learn/sfen_writer.h new file mode 100644 index 00000000..1bbd916c --- /dev/null +++ b/src/learn/sfen_writer.h @@ -0,0 +1,206 @@ +#include "packed_sfen.h" +#include "sfen_stream.h" + +#include "misc.h" + +#include "extra/nnue_data_binpack_format.h" + +#include "syzygy/tbprobe.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace Learner { + + // Helper class for exporting Sfen + struct SfenWriter + { + // Amount of sfens required to flush the buffer. + static constexpr size_t SFEN_WRITE_SIZE = 5000; + + // File name to write and number of threads to create + SfenWriter(string filename_, int thread_num, uint64_t save_count, SfenOutputType sfen_output_type) + { + sfen_buffers_pool.reserve((size_t)thread_num * 10); + sfen_buffers.resize(thread_num); + + auto out = sync_region_cout.new_region(); + out << "INFO (sfen_writer): Creating new data file at " << filename_ << endl; + + sfen_format = sfen_output_type; + output_file_stream = create_new_sfen_output(filename_, sfen_format); + filename = filename_; + save_every = save_count; + + finished = false; + + file_worker_thread = std::thread([&] { this->file_write_worker(); }); + } + + ~SfenWriter() + { + flush(); + + finished = true; + file_worker_thread.join(); + output_file_stream.reset(); + +#if !defined(NDEBUG) + { + // All buffers should be empty since file_worker_thread + // should have written everything before exiting. + for (const auto& p : sfen_buffers) { assert(p == nullptr); (void)p ; } + assert(sfen_buffers_pool.empty()); + } +#endif + } + + void write(size_t thread_id, const PackedSfenValue& psv) + { + // We have a buffer for each thread and add it there. + // If the buffer overflows, write it to a file. + + // This buffer is prepared for each thread. + auto& buf = sfen_buffers[thread_id]; + + // Secure since there is no buf at the first time + // and immediately after writing the thread buffer. + if (!buf) + { + buf = std::make_unique(); + buf->reserve(SFEN_WRITE_SIZE); + } + + // Buffer is exclusive to this thread. + // There is no need for a critical section. + buf->push_back(psv); + + if (buf->size() >= SFEN_WRITE_SIZE) + { + // If you load it in sfen_buffers_pool, the worker will do the rest. + + // Critical section since sfen_buffers_pool is shared among threads. + std::unique_lock lk(mutex); + sfen_buffers_pool.emplace_back(std::move(buf)); + } + } + + void flush() + { + for (size_t i = 0; i < sfen_buffers.size(); ++i) + { + flush(i); + } + } + + // Move what remains in the buffer for your thread to a buffer for writing to a file. + void flush(size_t thread_id) + { + std::unique_lock lk(mutex); + + auto& buf = sfen_buffers[thread_id]; + + // There is a case that buf==nullptr, so that check is necessary. + if (buf && buf->size() != 0) + { + sfen_buffers_pool.emplace_back(std::move(buf)); + } + } + + // Dedicated thread to write to file + void file_write_worker() + { + while (!finished || sfen_buffers_pool.size()) + { + vector> buffers; + { + std::unique_lock lk(mutex); + + // Atomically swap take the filled buffers and + // create a new buffer pool for threads to fill. + buffers = std::move(sfen_buffers_pool); + sfen_buffers_pool = std::vector>(); + } + + if (!buffers.size()) + { + // Poor man's condition variable. + sleep(100); + } + else + { + for (auto& buf : buffers) + { + output_file_stream->write(*buf); + + sfen_write_count += buf->size(); + + // Add the processed number here, and if it exceeds save_every, + // change the file name and reset this counter. + sfen_write_count_current_file += buf->size(); + if (sfen_write_count_current_file >= save_every) + { + sfen_write_count_current_file = 0; + + // Sequential number attached to the file + int n = (int)(sfen_write_count / save_every); + + // Rename the file and open it again. + // Add ios::app in consideration of overwriting. + // (Depending on the operation, it may not be necessary.) + string new_filename = filename + "_" + std::to_string(n); + output_file_stream = create_new_sfen_output(new_filename, sfen_format); + + auto out = sync_region_cout.new_region(); + out << "INFO (sfen_writer): Creating new data file at " << new_filename << endl; + } + } + } + } + } + + private: + + std::unique_ptr output_file_stream; + + // A new net is saved after every save_every sfens are processed. + uint64_t save_every = std::numeric_limits::max(); + + // File name passed in the constructor + std::string filename; + + // Thread to write to the file + std::thread file_worker_thread; + + // Flag that all threads have finished + atomic finished; + + SfenOutputType sfen_format; + + // buffer before writing to file + // sfen_buffers is the buffer for each thread + // sfen_buffers_pool is a buffer for writing. + // After loading the phase in the former buffer by SFEN_WRITE_SIZE, + // transfer it to the latter. + std::vector> sfen_buffers; + std::vector> sfen_buffers_pool; + + // Mutex required to access sfen_buffers_pool + std::mutex mutex; + + // Number of sfens written in total, and the + // number of sfens written in the current file. + uint64_t sfen_write_count = 0; + uint64_t sfen_write_count_current_file = 0; + }; +}