From 078145ed21da237e100c78274b4882d818561797 Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Sun, 2 Jun 2024 21:29:26 -0400 Subject: [PATCH] use random_access_file for ship log; refactor catalog --- libraries/state_history/CMakeLists.txt | 1 - libraries/state_history/compression.cpp | 34 - .../eosio/state_history/compression.hpp | 14 - .../include/eosio/state_history/counter.hpp | 46 + .../include/eosio/state_history/log.hpp | 926 +++++------------- .../eosio/state_history/log_catalog.hpp | 261 +++++ .../eosio/state_history/log_config.hpp | 25 + .../eosio/state_history_plugin/session.hpp | 79 +- .../state_history_plugin.cpp | 37 +- tests/ship_log.cpp | 795 +++++++++++++-- unittests/state_history_tests.cpp | 43 +- 11 files changed, 1383 insertions(+), 878 deletions(-) delete mode 100644 libraries/state_history/compression.cpp delete mode 100644 libraries/state_history/include/eosio/state_history/compression.hpp create mode 100644 libraries/state_history/include/eosio/state_history/counter.hpp create mode 100644 libraries/state_history/include/eosio/state_history/log_catalog.hpp create mode 100644 libraries/state_history/include/eosio/state_history/log_config.hpp diff --git a/libraries/state_history/CMakeLists.txt b/libraries/state_history/CMakeLists.txt index 5a27b819f1..707d42c407 100644 --- a/libraries/state_history/CMakeLists.txt +++ b/libraries/state_history/CMakeLists.txt @@ -2,7 +2,6 @@ file(GLOB HEADERS "include/eosio/state-history/*.hpp") add_library( state_history abi.cpp - compression.cpp create_deltas.cpp trace_converter.cpp ${HEADERS} diff --git a/libraries/state_history/compression.cpp b/libraries/state_history/compression.cpp deleted file mode 100644 index 25fd3c96bd..0000000000 --- a/libraries/state_history/compression.cpp +++ /dev/null @@ -1,34 +0,0 @@ -#include - -#include -#include -#include - -namespace eosio { -namespace state_history { - -namespace bio = boost::iostreams; -bytes zlib_compress_bytes(const bytes& in) { - bytes out; - bio::filtering_ostream comp; - comp.push(bio::zlib_compressor(bio::zlib::default_compression)); - comp.push(bio::back_inserter(out)); - bio::write(comp, in.data(), in.size()); - bio::close(comp); - return out; -} - -bytes zlib_decompress(std::string_view data) { - bytes out; - bio::filtering_ostream decomp; - decomp.push(bio::zlib_decompressor()); - decomp.push(bio::back_inserter(out)); - bio::write(decomp, data.data(), data.size()); - bio::close(decomp); - return out; -} - - - -} // namespace state_history -} // namespace eosio diff --git a/libraries/state_history/include/eosio/state_history/compression.hpp b/libraries/state_history/include/eosio/state_history/compression.hpp deleted file mode 100644 index cafddc9df8..0000000000 --- a/libraries/state_history/include/eosio/state_history/compression.hpp +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include - -namespace eosio { -namespace state_history { - -using chain::bytes; - -bytes zlib_compress_bytes(const bytes& in); -bytes zlib_decompress(std::string_view); - -} // namespace state_history -} // namespace eosio diff --git a/libraries/state_history/include/eosio/state_history/counter.hpp b/libraries/state_history/include/eosio/state_history/counter.hpp new file mode 100644 index 0000000000..c2d4520bc0 --- /dev/null +++ b/libraries/state_history/include/eosio/state_history/counter.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include + +namespace eosio::detail { + +namespace bio = boost::iostreams; + +// directly adapt from boost/iostreams/filter/counter.hpp and change the type of chars_ to uint64_t. +class counter { +public: + typedef char char_type; + struct category + : bio::dual_use, + bio::filter_tag, + bio::multichar_tag, + bio::optimally_buffered_tag + { }; + + uint64_t characters() const { return chars_; } + std::streamsize optimal_buffer_size() const { return 64*1024; } + + template + std::streamsize read(Source& src, char_type* s, std::streamsize n) + { + std::streamsize result = bio::read(src, s, n); + if (result == -1) + return -1; + chars_ += result; + return result; + } + + template + std::streamsize write(Sink& snk, const char_type* s, std::streamsize n) + { + std::streamsize result = bio::write(snk, s, n); + chars_ += result; + return result; + } + +private: + uint64_t chars_ = 0; +}; +BOOST_IOSTREAMS_PIPABLE(counter, 0) + +} \ No newline at end of file diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index 58d675bf28..ed9a10ea44 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -1,14 +1,13 @@ #pragma once -#include #include #include #include -#include -#include -#include +#include +#include #include +#include #include #include //set_thread_name #include @@ -19,14 +18,13 @@ #include #include #include +#include +#include #include #include - -struct state_history_test_fixture; - -namespace eosio { +namespace eosio::state_history { namespace bio = boost::iostreams; /* @@ -68,292 +66,94 @@ static const uint16_t ship_feature_pruned_log = 1; inline bool is_ship_log_pruned(uint64_t magic) { return get_ship_features(magic) & ship_feature_pruned_log; } inline uint64_t clear_ship_log_pruned_feature(uint64_t magic) { return ship_magic(get_ship_version(magic), get_ship_features(magic) & ~ship_feature_pruned_log); } -struct state_history_log_header { +struct log_header { uint64_t magic = ship_magic(ship_current_version); chain::block_id_type block_id = {}; uint64_t payload_size = 0; }; -static constexpr int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) + - sizeof(state_history_log_header::block_id) + - sizeof(state_history_log_header::payload_size); -static_assert(sizeof(state_history_log_header) == state_history_log_header_serial_size); - -static constexpr unsigned ship_log_iostreams_buffer_size = 64*1024; - -namespace state_history { - struct prune_config { - uint32_t prune_blocks; //number of blocks to prune to when doing a prune - size_t prune_threshold = 4*1024*1024; //(approximately) how many bytes need to be added before a prune is performed - std::optional vacuum_on_close; //when set, a vacuum is performed on dtor if log contains less than this many bytes - }; - - struct partition_config { - std::filesystem::path retained_dir = "retained"; - std::filesystem::path archive_dir = "archive"; - uint32_t stride = 1000000; - uint32_t max_retained_files = UINT32_MAX; - }; -} // namespace state_history - -using state_history_log_config = std::variant; - -struct locked_decompress_stream { - std::unique_lock lock; // state_history_log mutex - std::variant, std::unique_ptr> buf; - - locked_decompress_stream() = delete; - locked_decompress_stream(locked_decompress_stream&&) = default; - - explicit locked_decompress_stream(std::unique_lock l) - : lock(std::move(l)) {}; - - template - void init(StateHistoryLog&& log, fc::cfile& stream, uint64_t compressed_size) { - auto istream = std::make_unique(); - istream->push(bio::zlib_decompressor(), ship_log_iostreams_buffer_size); - istream->push(bio::restrict(bio::file_source(stream.get_file_path().string()), stream.tellp(), compressed_size), ship_log_iostreams_buffer_size); - buf = std::move(istream); - } - - template - void init(LogData&& log, fc::datastream& stream, uint64_t compressed_size) { - auto istream = std::make_unique(); - istream->push(bio::zlib_decompressor(), ship_log_iostreams_buffer_size); - istream->push(bio::restrict(bio::file_source(log.filename), stream.pos() - log.data(), compressed_size), ship_log_iostreams_buffer_size); - buf = std::move(istream); - } - - size_t init(std::vector cbuf) { - buf.emplace>( std::move(cbuf) ); - return std::get>(buf).size(); - } +struct log_header_with_sizes : log_header { + uint32_t compressed_size = 0; + uint64_t uncompressed_size = 0; }; -namespace detail { - -inline std::vector zlib_decompress(fc::cfile& file, uint64_t compressed_size) { - if (compressed_size) { - std::vector compressed(compressed_size); - file.read(compressed.data(), compressed_size); - return state_history::zlib_decompress({compressed.data(), compressed_size}); - } - return {}; -} - -inline std::vector zlib_decompress(fc::datastream& strm, uint64_t compressed_size) { - if (compressed_size) { - return state_history::zlib_decompress({strm.pos(), compressed_size}); - } - return {}; -} - -template -uint64_t read_unpacked_entry(Log&& log, Stream& stream, uint64_t payload_size, locked_decompress_stream& result) { - // result has state_history_log mutex locked - - uint32_t s; - stream.read((char*)&s, sizeof(s)); - if (s == 1 && payload_size > (s + sizeof(uint32_t))) { - uint64_t compressed_size = payload_size - sizeof(uint32_t) - sizeof(uint64_t); - uint64_t decompressed_size; - stream.read((char*)&decompressed_size, sizeof(decompressed_size)); - result.init(log, stream, compressed_size); - return decompressed_size; - } else { - // Compressed deltas now exceeds 4GB on one of the public chains. This length prefix - // was intended to support adding additional fields in the future after the - // packed deltas or packed traces. For now we're going to ignore on read. - - uint64_t compressed_size = payload_size - sizeof(uint32_t); - return result.init( zlib_decompress(stream, compressed_size) ); - } -} - -class state_history_log_data : public chain::log_data_base { - uint32_t version_; - bool is_currently_pruned_; - uint64_t size_; - - public: - state_history_log_data() = default; - explicit state_history_log_data(const std::filesystem::path& path) { open(path); } - - void open(const std::filesystem::path& path) { - if (file.is_open()) - file.close(); - file.set_file_path(path); - file.open("rb"); - uint64_t v = chain::read_data_at(file, 0); - version_ = get_ship_version(v); - is_currently_pruned_ = is_ship_log_pruned(v); - file.seek_end(0); - size_ = file.tellp(); - } - - uint64_t size() const { return size_; } - uint32_t version() const { return version_; } - uint32_t first_block_num() { return block_num_at(0); } - uint32_t first_block_position() const { return 0; } - - bool is_currently_pruned() const { return is_currently_pruned_; } - - uint64_t ro_stream_at(uint64_t pos, locked_decompress_stream& result) { - uint64_t payload_size = payload_size_at(pos); - file.seek(pos + sizeof(state_history_log_header)); - // fc::datastream stream(file.const_data() + pos + sizeof(state_history_log_header), payload_size); - return read_unpacked_entry(*this, file, payload_size, result); - } - - uint32_t block_num_at(uint64_t position) { - return fc::endian_reverse_u32( - chain::read_data_at(file, position + offsetof(state_history_log_header, block_id))); - } - - chain::block_id_type block_id_at(uint64_t position) { - return chain::read_data_at(file, position + - offsetof(state_history_log_header, block_id)); +struct ship_log_entry { + uint64_t get_uncompressed_size() { + if(!uncompressed_size) { + bio::filtering_istreambuf buf(bio::zlib_decompressor() | bio::restrict(device, compressed_data_offset, compressed_data_size)); + uncompressed_size = bio::copy(buf, bio::null_sink()); + } + return *uncompressed_size; } - uint64_t payload_size_at(uint64_t pos) { - std::string filename = file.get_file_path().generic_string(); - EOS_ASSERT(size() >= pos + sizeof(state_history_log_header), chain::plugin_exception, - "corrupt ${name}: invalid entry size at at position ${pos}", ("name", filename)("pos", pos)); - - state_history_log_header header = chain::read_data_at(file, pos); - - EOS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic), chain::plugin_exception, - "corrupt ${name}: invalid header for entry at position ${pos}", ("name", filename)("pos", pos)); - - EOS_ASSERT(size() >= pos + sizeof(state_history_log_header) + header.payload_size, chain::plugin_exception, - "corrupt ${name}: invalid payload size for entry at position ${pos}", ("name", filename)("pos", pos)); - return header.payload_size; + bio::filtering_istreambuf get_stream() { + return bio::filtering_istreambuf(bio::zlib_decompressor() | bio::restrict(device, compressed_data_offset, compressed_data_size)); } - void construct_index(const std::filesystem::path& index_file_name) { - fc::cfile index_file; - index_file.set_file_path(index_file_name); - index_file.open("w+b"); - - uint64_t pos = 0; - while (pos < size()) { - uint64_t payload_size = payload_size_at(pos); - index_file.write(reinterpret_cast(&pos), sizeof(pos)); - pos += (sizeof(state_history_log_header) + payload_size + sizeof(uint64_t)); - } - } + fc::random_access_file::device device; + uint64_t compressed_data_offset; + uint64_t compressed_data_size; + std::optional uncompressed_size; }; -// directly adapt from boost/iostreams/filter/counter.hpp and change the type of chars_ to uint64_t. -class counter { +class state_history_log { public: - typedef char char_type; - struct category - : bio::dual_use, - bio::filter_tag, - bio::multichar_tag, - bio::optimally_buffered_tag - { }; - explicit counter(uint64_t first_char = 0) - : chars_(first_char) - { } - uint64_t characters() const { return chars_; } - std::streamsize optimal_buffer_size() const { return 0; } - - template - std::streamsize read(Source& src, char_type* s, std::streamsize n) - { - std::streamsize result = bio::read(src, s, n); - if (result == -1) - return -1; - chars_ += result; - return result; - } - - template - std::streamsize write(Sink& snk, const char_type* s, std::streamsize n) - { - std::streamsize result = bio::write(snk, s, n); - chars_ += result; - return result; - } -private: - uint64_t chars_; -}; - -} // namespace detail + using non_local_get_block_id_func = std::function(chain::block_num_type)>; -class state_history_log { - private: - const char* const name = ""; - state_history_log_config _config; + static std::optional no_non_local_get_block_id_func(chain::block_num_type) { + return std::nullopt; + } +private: + std::optional prune_config; + const non_local_get_block_id_func non_local_get_block_id; - // provide exclusive access to all data of this object since accessed from the main thread and the ship thread - mutable std::mutex _mx; - fc::cfile log; - fc::cfile index; - uint32_t _begin_block = 0; //always tracks the first block available even after pruning - uint32_t _index_begin_block = 0; //the first block of the file; even after pruning. it's what index 0 in the index file points to - uint32_t _end_block = 0; - chain::block_id_type last_block_id; + fc::random_access_file log; + fc::random_access_file index; + uint32_t _begin_block = 0; //always tracks the first block available even after pruning + uint32_t _index_begin_block = 0; //the first block of the file; even after pruning. it's what index 0 in the index file points to + uint32_t _end_block = 0; + chain::block_id_type last_block_id; - using catalog_t = chain::log_catalog>; - catalog_t catalog; + const unsigned packed_header_size = fc::raw::pack_size(log_header()); + const unsigned packed_header_with_sizes_size = fc::raw::pack_size(log_header_with_sizes()); public: - friend struct ::state_history_test_fixture; - - state_history_log( const state_history_log&) = delete; + state_history_log(const state_history_log&) = delete; + state_history_log& operator=(state_history_log&) = delete; - state_history_log(const char* name, const std::filesystem::path& log_dir, - state_history_log_config conf = {}) - : name(name) - , _config(std::move(conf)) { + state_history_log(const std::filesystem::path& log_dir_and_stem, + non_local_get_block_id_func non_local_get_block_id = no_non_local_get_block_id_func, + const std::optional& prune_conf = std::nullopt) : + prune_config(prune_conf), non_local_get_block_id(non_local_get_block_id), + log(std::filesystem::path(log_dir_and_stem).replace_extension("log")), + index(std::filesystem::path(log_dir_and_stem).replace_extension("index")) { + EOS_ASSERT(!!non_local_get_block_id, chain::plugin_exception, "misuse of get_block_id"); - log.set_file_path(log_dir/(std::string(name) + ".log")); - index.set_file_path(log_dir/(std::string(name) + ".index")); + if(prune_config) { + EOS_ASSERT(prune_config->prune_blocks, chain::plugin_exception, "state history log prune configuration requires at least one block"); + EOS_ASSERT(__builtin_popcount(prune_config->prune_threshold) == 1, chain::plugin_exception, "state history prune threshold must be power of 2"); + //switch this over to the mask that will be used + prune_config->prune_threshold = ~(prune_config->prune_threshold-1); + } open_log(); open_index(); - std::visit(eosio::chain::overloaded{ - [](std::monostate&) {}, - [](state_history::prune_config& conf) { - EOS_ASSERT(conf.prune_blocks, chain::plugin_exception, "state history log prune configuration requires at least one block"); - EOS_ASSERT(__builtin_popcount(conf.prune_threshold) == 1, chain::plugin_exception, "state history prune threshold must be power of 2"); - //switch this over to the mask that will be used - conf.prune_threshold = ~(conf.prune_threshold-1); - }, [name, log_dir, this](state_history::partition_config& conf) { - catalog.open(log_dir, conf.retained_dir, conf.archive_dir, name); - catalog.max_retained_files = conf.max_retained_files; - if (_end_block == 0) { - _index_begin_block = _begin_block = _end_block = catalog.last_block_num() +1; - } - } - }, _config); - //check for conversions to/from pruned log, as long as log contains something - if(_begin_block != _end_block) { - state_history_log_header first_header; - log.seek(0); - read_header(first_header); - - auto prune_config = std::get_if(&_config); + if(!empty()) { + log_header first_header = log.unpack_from(0); - if((is_ship_log_pruned(first_header.magic) == false) && prune_config) { - //need to convert non-pruned to pruned; first prune any ranges we can (might be none) - prune(fc::log_level::info); + if(!is_ship_log_pruned(first_header.magic) && prune_conf) { //non-pruned to pruned + //need to convert non-pruned to pruned; first prune any ranges we can up-front (might be none) + prune(); //update first header to indicate prune feature is enabled - log.seek(0); first_header.magic = ship_magic(get_ship_version(first_header.magic), ship_feature_pruned_log); - write_header(first_header); + log.pack_to(first_header, 0); //write trailer on log with num blocks - log.seek_end(0); - const uint32_t num_blocks_in_log = _end_block - _begin_block; - fc::raw::pack(log, num_blocks_in_log); + log.pack_to_end(_end_block - _begin_block); } - else if(is_ship_log_pruned(first_header.magic) && !prune_config) { + else if(is_ship_log_pruned(first_header.magic) && !prune_config) { //pruned to non-pruned vacuum(); } } @@ -361,510 +161,297 @@ class state_history_log { ~state_history_log() { //nothing to do if log is empty or we aren't pruning - if(_begin_block == _end_block) + if(empty()) return; - auto prune_config = std::get_if(&_config); if(!prune_config || !prune_config->vacuum_on_close) return; const size_t first_data_pos = get_pos(_begin_block); - const size_t last_data_pos = std::filesystem::file_size(log.get_file_path()); + const size_t last_data_pos = log.size(); if(last_data_pos - first_data_pos < *prune_config->vacuum_on_close) vacuum(); } - const state_history_log_config& config() const { - return _config; - } - // begin end std::pair block_range() const { - std::lock_guard g(_mx); - return { std::min(catalog.first_block_num(), _begin_block), _end_block }; + return {_begin_block, _end_block}; } bool empty() const { - auto r = block_range(); - return r.first == r.second; - } - - locked_decompress_stream create_locked_decompress_stream() { - return locked_decompress_stream{ std::unique_lock( _mx ) }; - } - - /// @return the decompressed entry size - uint64_t get_unpacked_entry(uint32_t block_num, locked_decompress_stream& result) { - - // result has mx locked - - auto opt_decompressed_size = catalog.ro_stream_for_block(block_num, result); - if (opt_decompressed_size) - return *opt_decompressed_size; - - if (block_num < _begin_block || block_num >= _end_block) - return 0; - - state_history_log_header header; - log.seek(get_pos(block_num)); - read_header(header); - - return detail::read_unpacked_entry(*this, log, header.payload_size, result); + const auto [first, second] = block_range(); + return first == second; + } + + std::optional get_entry(uint32_t block_num) { + if(block_num < _begin_block || block_num >= _end_block) + return std::nullopt; + + const uint64_t log_pos = get_pos(block_num); + log_header_with_sizes header = log.unpack_from(log_pos); + + //There are three types of "payload headers" that trail the magic/block_id/payload_size header: + // 1) up through and including EOSIO 2.0 would add an uint32_t indicating compressed message size + // 2) Leap 3.x would hardcode this uint32_t to 0 + // 3) Leap 4.0+ would hardcode this uint32_t to 1, and then add an uint64_t with the _uncompressed_ size + // (knowing the uncompressed size ahead of time makes it convenient to stream the data to the client which + // needs uncompressed size ahead of time) + // 1 & 2 are problematic for the current streaming of the logs to clients. There appears to be no option other + // then making two passes through the compressed data: once to figure out the uncompressed size to send up front + // to the client, then a second time to actually decompress the data to send to the client. But don't do the first + // pass here -- delay that until we're on the ship thread. + constexpr size_t prel4_head_size = sizeof(log_header_with_sizes::compressed_size); + constexpr size_t l4_head_size = sizeof(log_header_with_sizes::compressed_size) + sizeof(log_header_with_sizes::uncompressed_size); + return ship_log_entry{ + .device = log.seekable_device(), + .compressed_data_offset = log_pos + packed_header_size + (header.compressed_size == 1 ? l4_head_size : prel4_head_size), + .compressed_data_size = header.payload_size - (header.compressed_size == 1 ? l4_head_size : prel4_head_size), + .uncompressed_size = (header.compressed_size == 1 ? std::optional(header.uncompressed_size) : std::nullopt) + }; } template - void pack_and_write_entry(state_history_log_header header, const chain::block_id_type& prev_id, F&& pack_to) { - std::lock_guard g(_mx); - write_entry(header, prev_id, [&, pack_to = std::forward(pack_to)](auto& stream) { - size_t payload_pos = stream.tellp(); - - // In order to conserve memory usage for reading the chain state later, we need to - // encode the uncompressed data size to the disk so that the reader can send the - // decompressed data size before decompressing data. Here we use the number - // 1 indicates the format contains a 64 bits unsigned integer for decompressed data - // size and then the actually compressed data. The compressed data size can be - // computed from the payload size in the header minus sizeof(uint32_t) + sizeof(uint64_t). - - uint32_t s = 1; - stream.write((char*)&s, sizeof(s)); - uint64_t uncompressioned_size = 0; - stream.skip(sizeof(uncompressioned_size)); - - namespace bio = boost::iostreams; - - detail::counter cnt; - { - bio::filtering_ostreambuf buf; - buf.push(boost::ref(cnt), ship_log_iostreams_buffer_size); - buf.push(bio::zlib_compressor(bio::zlib::no_compression, ship_log_iostreams_buffer_size)); - buf.push(bio::file_descriptor_sink(stream.fileno(), bio::never_close_handle), ship_log_iostreams_buffer_size); - pack_to(buf); - } - - // calculate the payload size and rewind back to header to write the payload size - stream.seek_end(0); - size_t end_payload_pos = stream.tellp(); - uint64_t payload_size = end_payload_pos - payload_pos; - stream.seek(payload_pos - sizeof(uint64_t)); - stream.write((char*)&payload_size, sizeof(payload_size)); - - // write the uncompressed data size - stream.skip(sizeof(s)); - uncompressioned_size = cnt.characters(); - stream.write((char*)&uncompressioned_size, sizeof(uncompressioned_size)); - - // make sure we reset the file position to end_payload_pos to preserve API behavior - stream.seek(end_payload_pos); - }); - } - - std::optional get_block_id(uint32_t block_num) { - std::lock_guard g(_mx); - return get_block_id_i(block_num); - } - -#ifdef BOOST_TEST - fc::cfile& get_log_file() { return log;} -#endif - - private: - - void read_header(state_history_log_header& header, bool assert_version = true) { - char bytes[state_history_log_header_serial_size]; - log.read(bytes, sizeof(bytes)); - fc::datastream ds(bytes, sizeof(bytes)); - fc::raw::unpack(ds, header); - EOS_ASSERT(!ds.remaining(), chain::plugin_exception, "state_history_log_header_serial_size mismatch"); - if (assert_version) - EOS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic), chain::plugin_exception, - "corrupt ${name}.log (0)", ("name", name)); - } - - void write_header(const state_history_log_header& header) { - char bytes[state_history_log_header_serial_size]; - fc::datastream ds(bytes, sizeof(bytes)); - fc::raw::pack(ds, header); - EOS_ASSERT(!ds.remaining(), chain::plugin_exception, "state_history_log_header_serial_size mismatch"); - log.write(bytes, sizeof(bytes)); - } - - template - void write_entry(state_history_log_header header, const chain::block_id_type& prev_id, F write_payload) { - auto block_num = chain::block_header::num_from_id(header.block_id); - EOS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception, - "missed a block in ${name}.log", ("name", name)); - - if (_begin_block != _end_block && block_num > _begin_block) { - if (block_num == _end_block) { - EOS_ASSERT(prev_id == last_block_id, chain::plugin_exception, "missed a fork change in ${name}.log", - ("name", name)); - } else { - state_history_log_header prev; - get_entry(block_num - 1, prev); - EOS_ASSERT(prev_id == prev.block_id, chain::plugin_exception, "missed a fork change in ${name}.log", - ("name", name)); - } + void pack_and_write_entry(const chain::block_id_type& id, const chain::block_id_type& prev_id, F&& pack_to) { + log_header_with_sizes header = {{ship_magic(ship_current_version, 0), id}, 1}; + const uint32_t block_num = chain::block_header::num_from_id(header.block_id); + + if(!empty()) { + EOS_ASSERT(block_num <= _end_block, chain::plugin_exception, "block ${b} skips over block ${e} in ${name}", ("b", block_num)("e", _end_block)("name", log.display_path())); + if(_end_block > 2u) + EOS_ASSERT(block_num > 2u, chain::plugin_exception, "existing ship log with ${eb} blocks when starting from genesis block ${b}", ("eb", _end_block-_begin_block)("b", block_num)); } - - auto prune_config = std::get_if(&_config); - if (block_num < _end_block) { - // This is typically because of a fork, and we need to truncate the log back to the beginning of the fork. - static uint32_t start_block_num = block_num; - // Guard agaisnt accidently starting a fresh chain with an existing ship log, require manual removal of ship logs. - EOS_ASSERT( block_num > 2, chain::plugin_exception, "Existing ship log with ${eb} blocks when starting from genesis block ${b}", - ("eb", _end_block)("b", block_num) ); - // block_num < _begin_block = pruned log, need to call truncate() to reset - // get_block_id_i check is an optimization to avoid writing a block that is already in the log (snapshot or replay) - if ( block_num < _begin_block || get_block_id_i(block_num) != header.block_id ) { - truncate(block_num); //truncate is expected to always leave file pointer at the end - } else { - if (start_block_num == block_num || block_num % 1000 == 0 ) - ilog("log ${name}.log already contains block ${b}, end block ${eb}", ("name", name)("b", block_num)("eb", _end_block)); + EOS_ASSERT(block_num >= _index_begin_block, chain::plugin_exception, "block ${b} is before start block ${s} of ${name}", ("b", block_num)("s", _begin_block)("name", log.display_path())); + if(block_num == _end_block) //appending at the end of known blocks; can shortcut some checks since we have last_block_id readily available + EOS_ASSERT(prev_id == last_block_id, chain::plugin_exception, "missed a fork change in ${name}", ("name", log.display_path())); + else { //seeing a block num we've seen before OR first block in the log; prepare some extra checks + //find the previous block id as a sanity check. This might not be in our log due to log splitting. It also might not be present at all if this is the first + // block written, so don't require this lookup to succeed, just require the id to match if the lookup succeeded. + if(std::optional local_id_found = get_block_id(block_num-1)) + EOS_ASSERT(local_id_found == prev_id, chain::plugin_exception, "missed a fork change in ${name}", ("name", log.display_path())); + else if(std::optional non_local_id_found = non_local_get_block_id(block_num-1)) + EOS_ASSERT(non_local_id_found == prev_id, chain::plugin_exception, "missed a fork change in ${name}", ("name", log.display_path())); + //we don't want to re-write blocks that we already have, so check if the existing block_id recorded in the log matches and if so, bail + if(get_block_id(block_num) == id) return; - } - } else if (!prune_config) { - log.seek_end(0); - } else if (prune_config && _begin_block != _end_block) { - log.seek_end(-sizeof(uint32_t)); //overwrite the trailing block count marker on this write } - //if we're operating on a pruned block log and this is the first entry in the log, make note of the feature in the header - if(prune_config && _begin_block == _end_block) - header.magic = ship_magic(get_ship_version(header.magic), ship_feature_pruned_log); + ssize_t log_insert_pos = log.size(); + if(prune_config) { + if(!empty()) //overwrite the prune trailer that is at the end of the log + log_insert_pos -= sizeof(uint32_t); + else //we're operating on a pruned block log and this is the first entry in the log, make note of the feature in the header + header.magic = ship_magic(get_ship_version(header.magic), ship_feature_pruned_log); + } - uint64_t pos = log.tellp(); + const ssize_t payload_insert_pos = log_insert_pos + packed_header_with_sizes_size; - write_header(header); - write_payload(log); + bio::filtering_ostreambuf buf(detail::counter() | bio::zlib_compressor(bio::zlib::no_compression) | detail::counter() | bio::restrict(log.seekable_device(), payload_insert_pos)); + pack_to(buf); + bio::close(buf); + header.uncompressed_size = buf.component(0)->characters(); + header.payload_size = buf.component(2)->characters() + sizeof(header.compressed_size) + sizeof(header.uncompressed_size); + log.pack_to(header, log_insert_pos); - if (header.payload_size != 0) - EOS_ASSERT(log.tellp() == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception, - "wrote payload with incorrect size to ${name}.log", ("name", name)); - fc::raw::pack(log, pos); + fc::random_access_file::write_datastream appender = log.append_ds(); + fc::raw::pack(appender, (uint64_t)log_insert_pos); - index.seek_end(0); - fc::raw::pack(index, pos); - if (_begin_block == _end_block) + const bool was_empty = empty(); + if(was_empty) _index_begin_block = _begin_block = block_num; - _end_block = block_num + 1; - last_block_id = header.block_id; + else if(block_num < _begin_block) //the log wasn't empty, but this block is before the first available block in a pruned log: reset the beginning + _begin_block = _end_block = block_num; + if(was_empty || block_num == _end_block || block_num == _end_block-1) //update last_block_id for appending or overwriting last block + last_block_id = header.block_id; + if(was_empty || block_num == _end_block) + _end_block = block_num + 1; + + index.pack_to((uint64_t)log_insert_pos, (block_num-_index_begin_block)*sizeof(uint64_t)); if(prune_config) { - if((pos&prune_config->prune_threshold) != (log.tellp()&prune_config->prune_threshold)) - prune(fc::log_level::debug); + if((log_insert_pos&prune_config->prune_threshold) != (log.size()&prune_config->prune_threshold)) + prune(); const uint32_t num_blocks_in_log = _end_block - _begin_block; - fc::raw::pack(log, num_blocks_in_log); - } - - log.flush(); - index.flush(); - - auto partition_config = std::get_if(&_config); - if (partition_config && block_num % partition_config->stride == 0) { - split_log(); + fc::raw::pack(appender, num_blocks_in_log); } } - fc::cfile& get_entry(uint32_t block_num, state_history_log_header& header) { - EOS_ASSERT(block_num >= _begin_block && block_num < _end_block, chain::plugin_exception, - "read non-existing block in ${name}.log", ("name", name)); - log.seek(get_pos(block_num)); - read_header(header); - return log; - } - - std::optional get_block_id_i(uint32_t block_num) { - auto result = catalog.id_for_block(block_num); - if (!result) { - if (block_num >= _begin_block && block_num < _end_block) { - state_history_log_header header; - get_entry(block_num, header); - EOS_ASSERT(chain::block_header::num_from_id(header.block_id) == block_num, chain::plugin_exception, - "header id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(header.block_id))("b", block_num)); - return header.block_id; - } - return {}; - } - EOS_ASSERT(chain::block_header::num_from_id(*result) == block_num, chain::plugin_exception, - "catalog id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(*result))("b", block_num)); - return result; - } - - //file position must be at start of last block's suffix (back pointer) - //called from open_log / ctor - bool get_last_block() { - state_history_log_header header; - uint64_t suffix; - - fc::raw::unpack(log, suffix); - const size_t after_suffix_pos = log.tellp(); - if (suffix > after_suffix_pos || suffix + state_history_log_header_serial_size > after_suffix_pos) { - elog("corrupt ${name}.log (2)", ("name", name)); - return false; - } - log.seek(suffix); - read_header(header, false); - if (!is_ship(header.magic) || !is_ship_supported_version(header.magic) || - suffix + state_history_log_header_serial_size + header.payload_size + sizeof(suffix) != after_suffix_pos) { - elog("corrupt ${name}.log (3)", ("name", name)); - return false; - } - _end_block = chain::block_header::num_from_id(header.block_id) + 1; - last_block_id = header.block_id; - if (_begin_block >= _end_block) { - elog("corrupt ${name}.log (4)", ("name", name)); - return false; - } - return true; + std::optional get_block_id(uint32_t block_num) { + if(block_num >= _begin_block && block_num < _end_block) + return log.unpack_from(get_pos(block_num)).block_id; + return std::nullopt; } - void prune(const fc::log_level& loglevel) { - auto prune_config = std::get_if(&_config); - + private: + void prune() { if(!prune_config) return; if(_end_block - _begin_block <= prune_config->prune_blocks) return; const uint32_t prune_to_num = _end_block - prune_config->prune_blocks; + ///TODO: we need to cap this to the lowest position there are any active entries reading from uint64_t prune_to_pos = get_pos(prune_to_num); - - log.punch_hole(state_history_log_header_serial_size, prune_to_pos); + log.punch_hole(fc::raw::pack_size(log_header()), prune_to_pos); _begin_block = prune_to_num; - log.flush(); + ilog("${name} pruned to blocks ${b}-${e}", ("name", log.display_path())("b", _begin_block)("e", _end_block - 1)); + } - if(auto l = fc::logger::get(); l.is_enabled(loglevel)) - l.log(fc::log_message(fc::log_context(loglevel, __FILE__, __LINE__, __func__), - "${name}.log pruned to blocks ${b}-${e}", fc::mutable_variant_object()("name", name)("b", _begin_block)("e", _end_block - 1))); + bool discover_and_check_last_block_ok(bool is_pruned) { + try { + //fetch the last block header from the log solely using the log (i.e. not the index: so don't use get_pos()). This is a sanity check. + log_header last_header = log.unpack_from(log.unpack_from(log.size() - sizeof(uint64_t) - (is_pruned ? sizeof(uint32_t) : 0))); + FC_ASSERT(is_ship(last_header.magic) && is_ship_supported_version(last_header.magic), "Unexpected header magic on last block"); + _end_block = chain::block_header::num_from_id(last_header.block_id) + 1; + last_block_id = last_header.block_id; + FC_ASSERT(_begin_block < _end_block, "Block numbers from head and tail of log are not expected"); + } + catch(const std::bad_alloc&) { + throw; + } + catch(const std::exception& e) { + ilog("Failure while checking ${name}: ${m}", ("name", log.display_path())("m", e.what())); + return false; + } + return true; } - //only works on non-pruned logs + //only works on non-pruned logs since it has to work tail to head void recover_blocks() { - ilog("recover ${name}.log", ("name", name)); - uint64_t pos = 0; - uint32_t num_found = 0; - log.seek_end(0); - const size_t size = log.tellp(); + const size_t size = log.size(); + const size_t header_size = fc::raw::pack_size(log_header()); + size_t pos = 0; + uint32_t num_found = 0; while (true) { - state_history_log_header header; - if (pos + state_history_log_header_serial_size > size) + if(pos + header_size > size) break; - log.seek(pos); - read_header(header, false); + log_header header = log.unpack_from(pos); + uint64_t suffix; - if (!is_ship(header.magic) || !is_ship_supported_version(header.magic) || header.payload_size > size || - pos + state_history_log_header_serial_size + header.payload_size + sizeof(suffix) > size) { + if(!is_ship(header.magic) || !is_ship_supported_version(header.magic) || header.payload_size > size || + pos + header_size + header.payload_size + sizeof(suffix) > size) { EOS_ASSERT(!is_ship(header.magic) || is_ship_supported_version(header.magic), chain::plugin_exception, - "${name}.log has an unsupported version", ("name", name)); + "${name} has an unsupported version", ("name", log.display_path())); break; } - log.seek(pos + state_history_log_header_serial_size + header.payload_size); - log.read((char*)&suffix, sizeof(suffix)); - if (suffix != pos) + suffix = log.unpack_from(pos + header_size + header.payload_size); + if(suffix != pos) break; - pos = pos + state_history_log_header_serial_size + header.payload_size + sizeof(suffix); - if (!(++num_found % 10000)) { + pos += header_size + header.payload_size + sizeof(suffix); + if(!(++num_found % 10000)) { ilog("${num_found} blocks found, log pos = ${pos}", ("num_found", num_found)("pos", pos)); } } - log.flush(); - std::filesystem::resize_file(log.get_file_path().string(), pos); - log.flush(); - - log.seek_end(-sizeof(pos)); - EOS_ASSERT(get_last_block(), chain::plugin_exception, "recover ${name}.log failed", ("name", name)); + log.resize(pos); } // only called from constructor void open_log() { - log.open(fc::cfile::create_or_update_rw_mode); - log.seek_end(0); - uint64_t size = log.tellp(); - log.close(); - - log.open(fc::cfile::update_rw_mode); - if (size >= state_history_log_header_serial_size) { - state_history_log_header header; - log.seek(0); - read_header(header, false); - EOS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic) && - state_history_log_header_serial_size + header.payload_size + sizeof(uint64_t) <= size, - chain::plugin_exception, "corrupt ${name}.log (1)", ("name", name)); - - log.seek_end(0); + if(log.size() == 0) + return; + + try { + log_header first_header = log.unpack_from(0); + FC_ASSERT(is_ship(first_header.magic) && is_ship_supported_version(first_header.magic), "Unexpected header magic"); std::optional pruned_count; - if(is_ship_log_pruned(header.magic)) { - //the existing log is a prune'ed log. find the count of blocks at the end - log.skip(-sizeof(uint32_t)); - uint32_t count; - fc::raw::unpack(log, count); - pruned_count = count; - log.skip(-sizeof(uint32_t)); - } + if(is_ship_log_pruned(first_header.magic)) + pruned_count = log.unpack_from(log.size() - sizeof(uint32_t)); - _index_begin_block = _begin_block = chain::block_header::num_from_id(header.block_id); - last_block_id = header.block_id; - log.skip(-sizeof(uint64_t)); - if(!get_last_block()) { - EOS_ASSERT(!is_ship_log_pruned(header.magic), chain::plugin_exception, "${name}.log is pruned and cannot have recovery attempted", ("name", name)); + _index_begin_block = _begin_block = chain::block_header::num_from_id(first_header.block_id); + last_block_id = first_header.block_id; + + if(!discover_and_check_last_block_ok(!!pruned_count)) { + FC_ASSERT(!is_ship_log_pruned(first_header.magic), "Pruned log is corrupted"); + ilog("Attempting to recover ${n}", ("n", log.display_path())); recover_blocks(); + FC_ASSERT(discover_and_check_last_block_ok(!!pruned_count), "Failed to recover blocks"); } if(pruned_count) _begin_block = _end_block - *pruned_count; - - ilog("${name}.log has blocks ${b}-${e}", ("name", name)("b", _begin_block)("e", _end_block - 1)); - } else { - EOS_ASSERT(!size, chain::plugin_exception, "corrupt ${name}.log (5)", ("name", name)); - ilog("${name}.log is empty", ("name", name)); - } + } EOS_RETHROW_EXCEPTIONS(chain::plugin_exception, "${name} is corrupted and cannot be repaired", ("name", log.display_path())); } // only called from constructor void open_index() { - index.open(fc::cfile::create_or_update_rw_mode); - index.seek_end(0); - if (index.tellp() == (static_cast(_end_block) - _index_begin_block) * sizeof(uint64_t)) + const uint64_t expected_index_size = (_end_block - _index_begin_block) * sizeof(uint64_t); + if(index.size() == expected_index_size) return; - ilog("Regenerate ${name}.index", ("name", name)); - index.close(); - - index.open("wb"); - log.seek_end(0); - if(log.tellp()) { - uint32_t remaining = _end_block - _begin_block; - index.seek((_end_block - _index_begin_block)*sizeof(uint64_t)); //this can make the index sparse for a pruned log; but that's okay - - log.seek(0); - state_history_log_header first_entry_header; - read_header(first_entry_header); - log.seek_end(0); - if(is_ship_log_pruned(first_entry_header.magic)) - log.skip(-sizeof(uint32_t)); - - while(remaining--) { - uint64_t pos = 0; - state_history_log_header header; - log.skip(-sizeof(pos)); - fc::raw::unpack(log, pos); - log.seek(pos); - read_header(header, false); - log.seek(pos); - EOS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic), chain::plugin_exception, "corrupt ${name}.log (6)", ("name", name)); - - index.skip(-sizeof(uint64_t)); - fc::raw::pack(index, pos); - index.skip(-sizeof(uint64_t)); - - if (!(remaining % 10000)) - ilog("${r} blocks remaining, log pos = ${pos}", ("r", remaining)("pos", pos)); - } - } - index.close(); - index.open(fc::cfile::create_or_update_rw_mode); - } + ilog("Regenerate ${name}", ("name", index.display_path())); + index.resize(0); - uint64_t get_pos(uint32_t block_num) { - uint64_t pos; - index.seek((block_num - _index_begin_block) * sizeof(pos)); - index.read((char*)&pos, sizeof(pos)); - return pos; - } + if(log.size()) { + ssize_t next_logpos = log.size() - sizeof(uint64_t); + index.resize(expected_index_size); - void truncate(uint32_t block_num) { - log.close(); - index.close(); + log_header header = log.unpack_from(0); + if(is_ship_log_pruned(header.magic)) + next_logpos -= sizeof(uint32_t); - auto first_block_num = catalog.empty() ? _begin_block : catalog.first_block_num(); - auto new_begin_block_num = catalog.truncate(block_num, log.get_file_path()); + do { + const uint64_t logpos = log.unpack_from(next_logpos); + header = log.unpack_from(logpos); + EOS_ASSERT(is_ship(header.magic) && is_ship_supported_version(header.magic), chain::plugin_exception, "corrupt ${name}, unknown header magic", ("name", log.display_path())); - // notice that catalog.truncate() can replace existing log and index files, so we have to - // close the files and reopen them again; otherwise we might operate on the obsolete files instead. + const uint64_t index_offset_for_bnum = (chain::block_header::num_from_id(header.block_id) - _index_begin_block)*sizeof(uint64_t); + if(index.unpack_from(index_offset_for_bnum) == 0) //don't overwrite newer blocks for a given blocknum + index.pack_to(logpos, index_offset_for_bnum); - if (new_begin_block_num > 0) { - _begin_block = new_begin_block_num; - _index_begin_block = new_begin_block_num; + next_logpos = logpos - sizeof(uint64_t); + if (!(chain::block_header::num_from_id(header.block_id) % 10000)) + ilog("${r} blocks remaining, log pos = ${pos}", ("r", chain::block_header::num_from_id(header.block_id) - _begin_block)("pos", logpos)); + } while(chain::block_header::num_from_id(header.block_id) != _begin_block); } + } - uint32_t num_removed; - - if (block_num <= _begin_block) { - num_removed = _end_block - first_block_num; - std::filesystem::resize_file(log.get_file_path().string(), 0); - std::filesystem::resize_file(index.get_file_path().string(), 0); - _begin_block = _end_block = block_num; - } else { - num_removed = _end_block - block_num; - - index.open("rb"); - uint64_t pos = get_pos(block_num); - index.close(); - - auto path = log.get_file_path().string(); - - std::filesystem::resize_file(log.get_file_path().string(), pos); - std::filesystem::resize_file(index.get_file_path().string(), (block_num - _index_begin_block) * sizeof(uint64_t)); - _end_block = block_num; - //this will leave the end of the log with the last block's suffix no matter if the log is operating in pruned - // mode or not. The assumption is truncate() is always immediately followed up with an append to the log thus - // restoring the prune trailer if required - } - - log.open(fc::cfile::update_rw_mode); - log.seek_end(0); - index.open(fc::cfile::create_or_update_rw_mode); - - ilog("fork or replay: removed ${n} blocks from ${name}.log", ("n", num_removed)("name", name)); + uint64_t get_pos(uint32_t block_num) { + return index.unpack_from((block_num - _index_begin_block) * sizeof(uint64_t)); } + void vacuum() { //a completely empty log should have nothing on disk; don't touch anything - if(_begin_block == _end_block) + if(empty()) return; - log.seek(0); - uint64_t magic; - fc::raw::unpack(log, magic); - EOS_ASSERT(is_ship_log_pruned(magic), chain::plugin_exception, "vacuum can only be performed on pruned logs"); + log_header first_header = log.unpack_from(0); + EOS_ASSERT(is_ship_log_pruned(first_header.magic), chain::plugin_exception, "vacuum can only be performed on pruned logs"); //may happen if _begin_block is still first block on-disk of log. clear the pruned feature flag & erase // the 4 byte trailer. The pruned flag is only set on the first header in the log, so it does not need // to be touched up if we actually vacuum up any other blocks to the front. if(_begin_block == _index_begin_block) { - log.seek(0); - fc::raw::pack(log, clear_ship_log_pruned_feature(magic)); - log.flush(); - std::filesystem::resize_file(log.get_file_path(), std::filesystem::file_size(log.get_file_path()) - sizeof(uint32_t)); + log.pack_to(clear_ship_log_pruned_feature(first_header.magic), 0); + log.resize(log.size() - sizeof(uint32_t)); return; } - ilog("Vacuuming pruned log ${n}", ("n", name)); + ilog("Vacuuming pruned log ${n}", ("n", log.display_path())); size_t copy_from_pos = get_pos(_begin_block); size_t copy_to_pos = 0; const size_t offset_bytes = copy_from_pos - copy_to_pos; const size_t offset_blocks = _begin_block - _index_begin_block; - log.seek_end(0); - size_t copy_sz = log.tellp() - copy_from_pos - sizeof(uint32_t); //don't copy trailer in to new unpruned log + size_t copy_sz = log.size() - copy_from_pos - sizeof(uint32_t); //don't copy trailer in to new unpruned log const uint32_t num_blocks_in_log = _end_block - _begin_block; std::vector buff; buff.resize(4*1024*1024); + fc::random_access_file::device log_device = log.seekable_device(); auto tick = std::chrono::time_point_cast(std::chrono::system_clock::now()); while(copy_sz) { const size_t copy_this_round = std::min(buff.size(), copy_sz); - log.seek(copy_from_pos); - log.read(buff.data(), copy_this_round); + log_device.seek(copy_from_pos, std::ios_base::beg); + log_device.read(buff.data(), copy_this_round); //iostreams Blocking concept requires reading all log.punch_hole(copy_to_pos, copy_from_pos+copy_this_round); - log.seek(copy_to_pos); - log.write(buff.data(), copy_this_round); + log_device.seek(copy_to_pos, std::ios_base::beg); + log_device.write(buff.data(), copy_this_round); copy_from_pos += copy_this_round; copy_to_pos += copy_this_round; @@ -872,14 +459,12 @@ class state_history_log { const auto tock = std::chrono::time_point_cast(std::chrono::system_clock::now()); if(tick < tock - std::chrono::seconds(5)) { - ilog("Vacuuming pruned log ${n}, ${b} bytes remaining", ("b", copy_sz)("n", name)); + ilog("Vacuuming pruned log ${n}, ${b} bytes remaining", ("b", copy_sz)("n", log.display_path())); tick = tock; } } - log.flush(); - std::filesystem::resize_file(log.get_file_path(), log.tellp()); + log.resize(copy_to_pos); - index.flush(); { boost::interprocess::mapped_region index_mapped(index, boost::interprocess::read_write); uint64_t* index_ptr = (uint64_t*)index_mapped.get_address(); @@ -889,32 +474,19 @@ class state_history_log { index_ptr[new_block_num] = new_pos; if(new_block_num + 1 != num_blocks_in_log) - log.seek(index_ptr[new_block_num + offset_blocks + 1] - offset_bytes - sizeof(uint64_t)); + log.pack_to(new_pos, index_ptr[new_block_num + offset_blocks + 1] - offset_bytes - sizeof(uint64_t)); else - log.seek_end(-sizeof(uint64_t)); - log.write((char*)&new_pos, sizeof(new_pos)); + log.pack_to(new_pos, log.size()-sizeof(uint64_t)); } } - std::filesystem::resize_file(index.get_file_path(), num_blocks_in_log*sizeof(uint64_t)); + index.resize(num_blocks_in_log*sizeof(uint64_t)); _index_begin_block = _begin_block; - ilog("Vacuum of pruned log ${n} complete",("n", name)); - } - - void split_log() { - index.close(); - log.close(); - - catalog.add(_begin_block, _end_block - 1, log.get_file_path().parent_path(), name); - - _index_begin_block = _begin_block = _end_block; - - log.open(fc::cfile::truncate_rw_mode); - log.seek_end(0); - index.open(fc::cfile::truncate_rw_mode); + ilog("Vacuum of pruned log ${n} complete",("n", log.display_path())); } -}; // state_history_log +}; -} // namespace eosio +} -FC_REFLECT(eosio::state_history_log_header, (magic)(block_id)(payload_size)) +FC_REFLECT(eosio::state_history::log_header, (magic)(block_id)(payload_size)) +FC_REFLECT_DERIVED(eosio::state_history::log_header_with_sizes, (eosio::state_history::log_header), (compressed_size)(uncompressed_size)); \ No newline at end of file diff --git a/libraries/state_history/include/eosio/state_history/log_catalog.hpp b/libraries/state_history/include/eosio/state_history/log_catalog.hpp new file mode 100644 index 0000000000..7b11623f97 --- /dev/null +++ b/libraries/state_history/include/eosio/state_history/log_catalog.hpp @@ -0,0 +1,261 @@ +#pragma once + +#include +#include + +#include +#include +#include + +#include + +#include +#include + +#include + +namespace eosio::state_history { + +using namespace boost::multi_index; + +struct catalogued_log_file { + chain::block_num_type begin_block_num = 0; + chain::block_num_type end_block_num = 0; + std::filesystem::path path_and_basename; //example: /some/dir/trace-history-50-59 i.e. does NOT include .log nor .index + std::optional log; + + size_t last_used_counter = 0; + + size_t effective_last_used_counter() const { + if(!log) + return 0; + return last_used_counter; + } + + catalogued_log_file(const catalogued_log_file&) = delete; + catalogued_log_file& operator=(catalogued_log_file&) = delete; + catalogued_log_file(chain::block_num_type begin_block_num, chain::block_num_type end_block_num, std::filesystem::path path_and_basename) : + begin_block_num(begin_block_num), end_block_num(end_block_num), path_and_basename(path_and_basename) {} +}; + +class log_catalog { + std::filesystem::path retained_dir; + std::filesystem::path archive_dir; + uint32_t max_retained_files = std::numeric_limits::max(); + uint32_t log_rotation_stride = std::numeric_limits::max(); + + const state_history_log::non_local_get_block_id_func non_local_get_block_id; + + //cache not just an optimization: when a log file is opened the last block in its log file is used as a determination of the log's end block, + // so we don't want to close an old log file while it's being written to during a fork event otherwise we'd effectively corrupt the catalog state + struct by_mru {}; + typedef multi_index_container< + catalogued_log_file, + indexed_by< + ordered_unique>, + ordered_non_unique,key<&catalogued_log_file::effective_last_used_counter>, std::greater> + > + > catalog_t; + catalog_t retained_log_files; + std::optional head_log; + const std::filesystem::path head_log_path_and_basename; //example: /some/dir/trace-history i.e. does NOT include .log nor .index + + size_t global_used_counter = 0; + +public: + log_catalog(const log_catalog&) = delete; + log_catalog& operator=(log_catalog&) = delete; + + log_catalog(const std::filesystem::path& log_dir, const state_history::state_history_log_config& config, const std::string& log_name, + state_history_log::non_local_get_block_id_func non_local_get_block_id = state_history_log::no_non_local_get_block_id_func) : + non_local_get_block_id(non_local_get_block_id), head_log_path_and_basename(log_dir / log_name) { + std::visit(chain::overloaded { + [this](const std::monostate&) { + open_head_log(); + }, //nothing needed + [this](const state_history::prune_config& prune) { + open_head_log(prune); + }, + [this, &log_dir, &log_name](const state_history::partition_config& partition_config) { + retained_dir = make_absolute_dir(log_dir, partition_config.retained_dir.empty() ? log_dir : partition_config.retained_dir); + if(!partition_config.archive_dir.empty()) + archive_dir = make_absolute_dir(log_dir, partition_config.archive_dir); + max_retained_files = partition_config.max_retained_files; + log_rotation_stride = partition_config.stride; + + const std::regex retained_logfile_regex("^" + log_name + R"(-\d+-\d+\.log$)"); + + for(const std::filesystem::directory_entry& dir_entry : std::filesystem::directory_iterator(retained_dir)) { + if(!dir_entry.is_regular_file()) + continue; + if(!std::regex_search(dir_entry.path().filename().string(), retained_logfile_regex)) + continue; + + const std::filesystem::path path_and_basename = dir_entry.path().parent_path() / dir_entry.path().stem(); + + state_history_log log(path_and_basename, [](chain::block_num_type) {return std::nullopt;}); + if(log.empty()) + continue; + const auto [begin_bnum, end_bnum] = log.block_range(); + retained_log_files.emplace(begin_bnum, end_bnum, path_and_basename); + } + + if(retained_log_files.size() > 1) + for(decltype(retained_log_files)::iterator it = retained_log_files.begin(); it != std::prev(retained_log_files.end()); ++it) + EOS_ASSERT(it->end_block_num == std::next(it)->begin_block_num, chain::plugin_exception, + "retained log file ${sf}.log has block range ${sb}-${se} but ${ef}.log has range ${eb}-${ee} which results in a hole", + ("sf", it->path_and_basename.native())("sb", it->begin_block_num)("se", it->end_block_num-1) + ("ef", std::next(it)->path_and_basename.native())("eb", std::next(it)->begin_block_num)("ee", std::next(it)->end_block_num-1)); + + + open_head_log(); + if(!retained_log_files.empty() && !head_log->empty()) + EOS_ASSERT(retained_log_files.rbegin()->end_block_num == head_log->block_range().first, chain::plugin_exception, + "retained log file ${sf}.log has block range ${sb}-${se} but head log has range ${eb}-${ee} which results in a hole", + ("sf", retained_log_files.rbegin()->path_and_basename.native())("sb", retained_log_files.rbegin()->begin_block_num)("se", retained_log_files.rbegin()->end_block_num-1) + ("eb", head_log->block_range().first)("ee", head_log->block_range().second-1)); + } + }, config); + + assert(!!head_log); + } + + template + void pack_and_write_entry(const chain::block_id_type& id, const chain::block_id_type& prev_id, F&& pack_to) { + const uint32_t block_num = chain::block_header::num_from_id(id); + + //we need this check for the case where the retained catalog has, say, 1000-4999, the head log is empty, and block 70 is to be written. + // call_for_log() will refer us to the empty head log since 50 is not in the retained catalog range but we don't want to add there, obviously. + // whereas for block 5000 in this case we do want to act on the empty head log. + if(!retained_log_files.empty()) + EOS_ASSERT(block_num >= retained_log_files.begin()->begin_block_num, chain::plugin_exception, + "block ${b} is before first block ${s} of ${name}.log", + ("b", block_num)("s", retained_log_files.begin()->begin_block_num)("name", retained_log_files.begin()->path_and_basename.string())); + + call_for_log(block_num, [&](state_history_log&& l) { + l.pack_and_write_entry(id, prev_id, pack_to); + }); + + //don't look at the just written block_num here since we might have not written to the head log, just consider the state of the head log + if(!head_log->empty() && (head_log->block_range().second-1) % log_rotation_stride == 0) + rotate_logs(); + } + + std::optional get_entry(uint32_t block_num) { + return call_for_log(block_num, [&](state_history_log&& l) { + return l.get_entry(block_num); + }); + } + + std::optional get_block_id(uint32_t block_num) { + return call_for_log(block_num, [&](state_history_log&& l) { + return l.get_block_id(block_num); + }); + } + + std::pair block_range() const { + uint32_t begin = 0; + uint32_t end = 0; + + if(!retained_log_files.empty()) { + begin = retained_log_files.begin()->begin_block_num; + end = retained_log_files.rbegin()->end_block_num; + } + if(!head_log->empty()) { + if(begin == 0) + begin = head_log->block_range().first; + end = head_log->block_range().second; + } + + return {begin, end}; + } + + bool empty() const { + const auto [first, second] = block_range(); + return first == second; + } + +private: + template + typename std::invoke_result_t call_for_log(const uint32_t block_num, F&& f) { + //watch out that this check will send any requests for block nums *less than* first retained block to head log too + if(catalog_t::iterator it = retained_log_files.upper_bound(block_num); + !retained_log_files.empty() && it != retained_log_files.begin() && block_num < std::prev(it)->end_block_num) { + catalog_t::iterator log_it = std::prev(it); + retained_log_files.modify(log_it, [&](catalogued_log_file& clf) { + if(!clf.log) + clf.log.emplace(clf.path_and_basename, non_local_get_block_id); + clf.last_used_counter = ++global_used_counter; + }); + + const unsigned num_log_files_to_keep_open = 5; + if(retained_log_files.size() >= num_log_files_to_keep_open+1) + retained_log_files.get().modify(std::next(retained_log_files.get().begin(), num_log_files_to_keep_open), [](catalogued_log_file& clf) { + clf.log.reset(); + }); + + return f(std::forward(const_cast(*log_it->log))); + } + else + return f(std::forward(*head_log)); + } + + void rotate_logs() { + const auto [begin, end] = head_log->block_range(); + std::filesystem::path new_log_basenamepath = retained_dir / head_log_path_and_basename.stem(); + new_log_basenamepath += "-" + std::to_string(begin) + "-" + std::to_string(end-1); + head_log.reset(); + //try and make sure we don't leave head_log unset if something throws below. any throw below should cause the node to + // stop, but during teardown of everything something might access head_log which is assumed to always be set elsewhere + // TODO: only close the old log once the new log is opened to avoid problems when opening throws too + auto reopen_head_log = fc::make_scoped_exit([this]() { + open_head_log(); + }); + + rename_bundle(head_log_path_and_basename, new_log_basenamepath); + retained_log_files.emplace(begin, end, new_log_basenamepath); + + while(retained_log_files.size() > max_retained_files) { + const catalog_t::iterator it = retained_log_files.begin(); + std::filesystem::path oldest_log_path_and_basename = it->path_and_basename; + if(archive_dir.empty()) { + std::filesystem::remove(oldest_log_path_and_basename.replace_extension("log")); + std::filesystem::remove(oldest_log_path_and_basename.replace_extension("index")); + } else { + rename_bundle(oldest_log_path_and_basename, archive_dir / oldest_log_path_and_basename.filename()); + } + retained_log_files.erase(it); + } + } + + void open_head_log(std::optional prune_config = std::nullopt) { + head_log.emplace(head_log_path_and_basename, non_local_get_block_id, prune_config); + } + + static std::filesystem::path make_absolute_dir(const std::filesystem::path& base_dir, std::filesystem::path new_dir) { + if(new_dir.is_relative()) + new_dir = base_dir / new_dir; + + if(!std::filesystem::is_directory(new_dir)) + std::filesystem::create_directories(new_dir); + + return new_dir; + } + + static void rename_if_not_exists(std::filesystem::path old_name, std::filesystem::path new_name) { + if(!std::filesystem::exists(new_name)) { + std::filesystem::rename(old_name, new_name); + } else { + std::filesystem::remove(old_name); + wlog("${new_name} already exists, just removing ${old_name}", ("old_name", old_name.string())("new_name", new_name.string())); + } + } + + static void rename_bundle(std::filesystem::path orig_path, std::filesystem::path new_path) { + rename_if_not_exists(orig_path.replace_extension(".log"), new_path.replace_extension(".log")); + rename_if_not_exists(orig_path.replace_extension(".index"), new_path.replace_extension(".index")); + } + +}; + +} \ No newline at end of file diff --git a/libraries/state_history/include/eosio/state_history/log_config.hpp b/libraries/state_history/include/eosio/state_history/log_config.hpp new file mode 100644 index 0000000000..b2cef8d82a --- /dev/null +++ b/libraries/state_history/include/eosio/state_history/log_config.hpp @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include +#include + +namespace eosio::state_history { + +struct prune_config { + uint32_t prune_blocks; //number of blocks to prune to when doing a prune + size_t prune_threshold = 4*1024*1024; //(approximately) how many bytes need to be added before a prune is performed + std::optional vacuum_on_close; //when set, a vacuum is performed on dtor if log contains less than this many bytes +}; + +struct partition_config { + std::filesystem::path retained_dir = "retained"; + std::filesystem::path archive_dir = "archive"; + uint32_t stride = 1000000; + uint32_t max_retained_files = UINT32_MAX; +}; + +using state_history_log_config = std::variant; + +} \ No newline at end of file diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp index 0b53d4b897..e1da7ee4d5 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp @@ -1,5 +1,4 @@ #pragma once -#include #include #include #include @@ -14,12 +13,9 @@ #include #include - extern const char* const state_history_plugin_abi; -namespace eosio { - -using namespace state_history; +namespace eosio::state_history { class session_base { public: @@ -39,7 +35,7 @@ class session final : public session_base { public: session(SocketType&& s, Executor&& st, chain::controller& controller, - std::optional& trace_log, std::optional& chain_state_log, std::optional& finality_data_log, + std::optional& trace_log, std::optional& chain_state_log, std::optional& finality_data_log, GetBlockID&& get_block_id, GetBlock&& get_block, OnDone&& on_done, fc::logger& logger) : strand(std::move(st)), stream(std::move(s)), wake_timer(strand), controller(controller), trace_log(trace_log), chain_state_log(chain_state_log), finality_data_log(finality_data_log), @@ -190,30 +186,24 @@ class session final : public session_base { return ret; } - boost::asio::awaitable write_log_entry(std::optional& log_stream, std::optional& log, chain::block_num_type block_num) { - uint64_t unpacked_size = 0; - - if(log_stream) //will be unset if either request did not ask for this log entry, or the log isn't enabled - unpacked_size = log->get_unpacked_entry(block_num, *log_stream); //will return 0 if log does not include the block num asked for - - if(unpacked_size) { - char buff[1024*1024]; - fc::datastream ds(buff, sizeof(buff)); - fc::raw::pack(ds, true); - history_pack_varuint64(ds, unpacked_size); - co_await stream.async_write_some(false, boost::asio::buffer(buff, ds.tellp())); - - ///TODO: why is there an uncompressed option in the variant?! Shouldn't it always be compressed? was this for old unit tests? - bio::filtering_istreambuf& decompression_stream = *std::get>(log_stream->buf); - std::streamsize red = 0; - while((red = bio::read(decompression_stream, buff, sizeof(buff))) != -1) { - if(red == 0) - continue; - co_await stream.async_write_some(false, boost::asio::buffer(buff, red)); - } - } - else { + boost::asio::awaitable write_log_entry(std::optional& log_stream) { + if(!log_stream) { //will be unset if either request did not ask for this log entry, or the log isn't enabled co_await stream.async_write_some(false, boost::asio::buffer(fc::raw::pack(false))); + co_return; + } + + char buff[1024*1024]; + fc::datastream ds(buff, sizeof(buff)); + fc::raw::pack(ds, true); + history_pack_varuint64(ds, log_stream->get_uncompressed_size()); + co_await stream.async_write_some(false, boost::asio::buffer(buff, ds.tellp())); + + bio::filtering_istreambuf decompression_stream = log_stream->get_stream(); + std::streamsize red = 0; + while((red = bio::read(decompression_stream, buff, sizeof(buff))) != -1) { + if(red == 0) + continue; + co_await stream.async_write_some(false, boost::asio::buffer(buff, red)); } } @@ -223,10 +213,9 @@ class session final : public session_base { struct block_package { get_blocks_result_base blocks_result_base; bool is_v1_request = false; - chain::block_num_type this_block_num = 0; //this shouldn't be needed post log de-mutexing - std::optional trace_stream; - std::optional state_stream; - std::optional finality_stream; + std::optional trace_entry; + std::optional state_entry; + std::optional finality_entry; }; while(true) { @@ -250,8 +239,7 @@ class session final : public session_base { .head = {self.controller.head_block_num(), self.controller.head_block_id()}, .last_irreversible = {self.controller.last_irreversible_block_num(), self.controller.last_irreversible_block_id()} }, - .is_v1_request = self.current_blocks_request_v1_finality.has_value(), - .this_block_num = self.next_block_cursor + .is_v1_request = self.current_blocks_request_v1_finality.has_value() }); if(const std::optional this_block_id = self.get_block_id(self.next_block_cursor)) { block_to_send->blocks_result_base.this_block = {self.current_blocks_request.start_block_num, *this_block_id}; @@ -260,11 +248,11 @@ class session final : public session_base { if(chain::signed_block_ptr sbp = get_block(self.next_block_cursor); sbp && self.current_blocks_request.fetch_block) block_to_send->blocks_result_base.block = fc::raw::pack(*sbp); if(self.current_blocks_request.fetch_traces && self.trace_log) - block_to_send->trace_stream.emplace(self.trace_log->create_locked_decompress_stream()); + block_to_send->trace_entry = self.trace_log->get_entry(self.next_block_cursor); if(self.current_blocks_request.fetch_deltas && self.chain_state_log) - block_to_send->state_stream.emplace(self.chain_state_log->create_locked_decompress_stream()); + block_to_send->state_entry = self.chain_state_log->get_entry(self.next_block_cursor); if(block_to_send->is_v1_request && *self.current_blocks_request_v1_finality && self.finality_data_log) - block_to_send->finality_stream.emplace(self.finality_data_log->create_locked_decompress_stream()); + block_to_send->finality_entry = self.finality_data_log->get_entry(self.next_block_cursor); } ++self.next_block_cursor; --self.send_credits; @@ -297,13 +285,10 @@ class session final : public session_base { co_await stream.async_write_some(false, boost::asio::buffer(fc::raw::pack(get_blocks_result_variant_index))); co_await stream.async_write_some(false, boost::asio::buffer(fc::raw::pack(block_to_send->blocks_result_base))); - //accessing the _logs here violates the rule that those should only be accessed on the main thread. However, we're - // only calling get_unpacked_entry() on it which assumes the mutex is held by the locked_decompress_stream. So this is - // "safe" in some aspects but can deadlock - co_await write_log_entry(block_to_send->trace_stream, trace_log, block_to_send->this_block_num); - co_await write_log_entry(block_to_send->state_stream, chain_state_log, block_to_send->this_block_num); + co_await write_log_entry(block_to_send->trace_entry); + co_await write_log_entry(block_to_send->state_entry); if(block_to_send->is_v1_request) - co_await write_log_entry(block_to_send->finality_stream, finality_data_log, block_to_send->this_block_num); + co_await write_log_entry(block_to_send->finality_entry); co_await stream.async_write_some(true, boost::asio::const_buffer()); } @@ -328,9 +313,9 @@ class session final : public session_base { chain::block_num_type& next_block_cursor = current_blocks_request.start_block_num; chain::controller& controller; - std::optional& trace_log; - std::optional& chain_state_log; - std::optional& finality_data_log; + std::optional& trace_log; + std::optional& chain_state_log; + std::optional& finality_data_log; GetBlockID get_block_id; GetBlock get_block; diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 249d9c5098..94804d4eb6 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -2,9 +2,9 @@ #include #include #include -#include #include -#include +#include +#include #include #include #include @@ -48,9 +48,9 @@ auto catch_and_log(F f) { struct state_history_plugin_impl { private: chain_plugin* chain_plug = nullptr; - std::optional trace_log; - std::optional chain_state_log; - std::optional finality_data_log; + std::optional trace_log; + std::optional chain_state_log; + std::optional finality_data_log; uint32_t first_available_block = 0; bool trace_debug_mode = false; std::optional applied_transaction_connection; @@ -182,10 +182,7 @@ struct state_history_plugin_impl { if(!trace_log) return; - state_history_log_header header{.magic = ship_magic(ship_current_version, 0), - .block_id = id, - .payload_size = 0}; - trace_log->pack_and_write_entry(header, block->previous, [this, &block](auto&& buf) { + trace_log->pack_and_write_entry(id, block->previous, [this, &block](bio::filtering_ostreambuf& buf) { trace_converter.pack(buf, trace_debug_mode, block); }); } @@ -197,9 +194,7 @@ struct state_history_plugin_impl { if(fresh) fc_ilog(_log, "Placing initial state in block ${n}", ("n", block_num)); - state_history_log_header header{ - .magic = ship_magic(ship_current_version, 0), .block_id = id, .payload_size = 0}; - chain_state_log->pack_and_write_entry(header, previous_id, [this, fresh](auto&& buf) { + chain_state_log->pack_and_write_entry(id, previous_id, [this, fresh](bio::filtering_ostreambuf& buf) { pack_deltas(buf, chain_plug->chain().db(), fresh); }); } // store_chain_state @@ -212,9 +207,7 @@ struct state_history_plugin_impl { if(!finality_data.has_value()) return; - state_history_log_header header{ - .magic = ship_magic(ship_current_version, 0), .block_id = id, .payload_size = 0}; - finality_data_log->pack_and_write_entry(header, previous_id, [finality_data](auto&& buf) { + finality_data_log->pack_and_write_entry(id, previous_id, [finality_data](bio::filtering_ostreambuf& buf) { fc::datastream ds{buf}; fc::raw::pack(ds, *finality_data); }); @@ -255,9 +248,7 @@ void state_history_plugin::set_program_options(options_description& cli, options options("state-history-unix-socket-path", bpo::value(), "the path (relative to data-dir) to create a unix socket upon which to listen for incoming connections."); options("trace-history-debug-mode", bpo::bool_switch()->default_value(false), "enable debug mode for trace history"); - - if(cfile::supports_hole_punching()) - options("state-history-log-retain-blocks", bpo::value(), "if set, periodically prune the state history files to store only configured number of most recent blocks"); + options("state-history-log-retain-blocks", bpo::value(), "if set, periodically prune the state history files to store only configured number of most recent blocks"); } void state_history_plugin_impl::plugin_initialize(const variables_map& options) { @@ -317,7 +308,7 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options) state_history_log_config ship_log_conf; if(options.count("state-history-log-retain-blocks")) { - auto& ship_log_prune_conf = ship_log_conf.emplace(); + state_history::prune_config& ship_log_prune_conf = ship_log_conf.emplace(); ship_log_prune_conf.prune_blocks = options.at("state-history-log-retain-blocks").as(); //the arbitrary limit of 1000 here is mainly so that there is enough buffer for newly applied forks to be delivered to clients // before getting pruned out. ideally pruning would have been smart enough to know not to prune reversible blocks @@ -325,7 +316,7 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options) EOS_ASSERT(!has_state_history_partition_options, plugin_exception, "state-history-log-retain-blocks cannot be used together with state-history-retained-dir," " state-history-archive-dir, state-history-stride or max-retained-history-files"); } else if(has_state_history_partition_options){ - auto& config = ship_log_conf.emplace(); + state_history::partition_config& config = ship_log_conf.emplace(); if(options.count("state-history-retained-dir")) config.retained_dir = options.at("state-history-retained-dir").as(); if(options.count("state-history-archive-dir")) @@ -337,11 +328,11 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options) } if(options.at("trace-history").as()) - trace_log.emplace("trace_history", state_history_dir , ship_log_conf); + trace_log.emplace(state_history_dir, ship_log_conf, "trace_history", [this](chain::block_num_type bn) {return get_block_id(bn);}); if(options.at("chain-state-history").as()) - chain_state_log.emplace("chain_state_history", state_history_dir, ship_log_conf); + chain_state_log.emplace(state_history_dir, ship_log_conf, "chain_state_history", [this](chain::block_num_type bn) {return get_block_id(bn);}); if(options.at("finality-data-history").as()) - finality_data_log.emplace("finality_data_history", state_history_dir, ship_log_conf); + finality_data_log.emplace(state_history_dir, ship_log_conf, "finality_data_history", [this](chain::block_num_type bn) {return get_block_id(bn);}); } FC_LOG_AND_RETHROW() } // state_history_plugin::plugin_initialize diff --git a/tests/ship_log.cpp b/tests/ship_log.cpp index 319ab8c766..7f3336d0c6 100644 --- a/tests/ship_log.cpp +++ b/tests/ship_log.cpp @@ -5,13 +5,76 @@ #include #include +#include +#include +#include -#include +#include #include #include namespace bdata = boost::unit_test::data; +namespace bio = boost::iostreams; +using namespace eosio; +using namespace eosio::chain; +using namespace fc; +using namespace std::literals; + +static block_id_type fake_blockid_for_num(const block_num_type block_num, const uint64_t salt = 0u) { + block_id_type ret; + ret._hash[0] = ret._hash[1] = ret._hash[2] = ret._hash[3] = salt; + ret._hash[0] &= 0xffffffff00000000; + ret._hash[0] += fc::endian_reverse_u32(block_num); + return ret; +} + +struct random_source { + typedef char char_type; + struct category : bio::seekable_device_tag {}; + std::streamsize read(char* s, std::streamsize n) { + if(n == 0) //why doesn't restrict() do this for us? + return -1; + rand_bytes(s, n); + return n; + } + std::streamsize write(const char_type* s, std::streamsize n) { + FC_ASSERT(false, "not supported"); + } + //this isn't valid but Device needs to be Seekable for restrict() + std::streampos seek(bio::stream_offset off, std::ios_base::seekdir) { + return off; + } +}; +BOOST_IOSTREAMS_PIPABLE(random_source, 0) + +struct sha256_filter { + typedef char char_type; + struct category : bio::dual_use, bio::filter_tag, bio::multichar_tag, bio::optimally_buffered_tag {}; + + std::streamsize optimal_buffer_size() const {return 4*1024;} + + template + std::streamsize read(Source& src, char_type* s, std::streamsize n) { + std::streamsize result = bio::read(src, s, n); + if(result == -1) + return -1; + enc->write(s, result); + return result; + } + + template + std::streamsize write(Sink& snk, const char_type* s, std::streamsize n) { + std::streamsize result = bio::write(snk, s, n); + enc->write(s, result); + return result; + } + + //sha256::encoder is not copyable which is a requirement for a Filter when used in a pipeline like done below. for this trivial + // non-production use case let's just hack the limitation by stuffing it in shared_ptr so it becomes "copyable" + std::shared_ptr enc = std::make_shared(); +}; +BOOST_IOSTREAMS_PIPABLE(sha256_filter, 0) struct ship_log_fixture { ship_log_fixture(bool enable_read, bool reopen_on_mark, bool remove_index_on_reopen, bool vacuum_on_exit_if_small, std::optional prune_blocks) : @@ -32,12 +95,8 @@ struct ship_log_fixture { return m; }; - eosio::state_history_log_header header; - header.block_id = block_for_id(index, fillchar); - header.payload_size = 0; - - log->pack_and_write_entry(header, block_for_id(index-1, prevchar), [&](auto& f) { - boost::iostreams::write(f, a.data(), a.size()); + log->pack_and_write_entry(block_for_id(index, fillchar), block_for_id(index-1, prevchar), [&](auto& f) { + bio::write(f, a.data(), a.size()); }); if(index + 1 > written_data.size()) @@ -46,28 +105,23 @@ struct ship_log_fixture { } void check_range_present(uint32_t first, uint32_t last) { - namespace bio = boost::iostreams; auto r = log->block_range(); BOOST_REQUIRE_EQUAL(r.first, first); BOOST_REQUIRE_EQUAL(r.second-1, last); if(enable_read) { for(auto i = first; i <= last; i++) { - auto result = log->create_locked_decompress_stream(); - log->get_unpacked_entry(i, result); - std::visit(eosio::chain::overloaded{ - [&](std::vector& buff) { BOOST_REQUIRE(buff == written_data.at(i)); }, - [&](std::unique_ptr& strm) { - std::vector buff; - boost::iostreams::copy(*strm, boost::iostreams::back_inserter(buff)); - BOOST_REQUIRE(buff == written_data.at(i)); - }} , result.buf); + std::optional entry = log->get_entry(i); + BOOST_REQUIRE(!!entry); + bio::filtering_istreambuf istream = entry->get_stream(); + std::vector buff; + bio::copy(istream, bio::back_inserter(buff)); + BOOST_REQUIRE(buff == written_data.at(i)); } } } void check_not_present(uint32_t index) { - auto result = log->create_locked_decompress_stream(); - BOOST_REQUIRE_EQUAL(log->get_unpacked_entry(index, result), 0u); + BOOST_REQUIRE(!log->get_entry(index));; } void check_empty() { @@ -85,10 +139,10 @@ struct ship_log_fixture { } bool enable_read, reopen_on_mark, remove_index_on_reopen, vacuum_on_exit_if_small; - eosio::state_history_log_config conf; + eosio::state_history::state_history_log_config conf; fc::temp_directory log_dir; - std::optional log; + std::optional log; std::vector> written_data; @@ -103,11 +157,11 @@ struct ship_log_fixture { if(vacuum_on_exit_if_small) prune_conf->vacuum_on_close = 1024*1024*1024; //something large: always vacuum on close for these tests } - log.emplace("shipit", log_dir.path(), conf); + log.emplace(log_dir.path(), conf, "shipit"); } }; -//can only punch holes on filesystem block boundaries. let's make sure the entries we add are larger than that +//historically can only punch holes on filesystem block boundaries. not true any longer, but let's make sure the entries we add are larger than that anyways static size_t larger_than_tmpfile_blocksize() { fc::temp_cfile tf; auto& cf = tf.file(); @@ -152,15 +206,8 @@ BOOST_DATA_TEST_CASE(basic_prune_test, bdata::xrange(2) * bdata::xrange(2) * bda t.check_range_present(4, 7); }); - //undo 6 & 7 and reapply 6 + //undo 6 & 7 and reapply new 6 & 7 t.add(6, payload_size, 'G', 'D'); - t.check_n_bounce([&]() { - t.check_not_present(2); - t.check_not_present(3); - t.check_not_present(7); - t.check_range_present(4, 6); - }); - t.add(7, payload_size, 'H', 'G'); t.check_n_bounce([&]() { t.check_not_present(2); @@ -175,14 +222,8 @@ BOOST_DATA_TEST_CASE(basic_prune_test, bdata::xrange(2) * bdata::xrange(2) * bda t.check_range_present(7, 10); }); - //undo back to the first stored block + //undo back to the first stored block and then add back a new fork t.add(7, payload_size, 'L', 'G'); - t.check_n_bounce([&]() { - t.check_range_present(7, 7); - t.check_not_present(6); - t.check_not_present(8); - }); - t.add(8, payload_size, 'M', 'L'); t.add(9, payload_size, 'N', 'M'); t.add(10, payload_size, 'O', 'N'); @@ -193,18 +234,8 @@ BOOST_DATA_TEST_CASE(basic_prune_test, bdata::xrange(2) * bdata::xrange(2) * bda t.check_not_present(7); }); - //undo past the first stored - t.add(6, payload_size, 'Q', 'D'); - t.check_n_bounce([&]() { - t.check_range_present(6, 6); - t.check_not_present(7); - t.check_not_present(8); - }); - //pile up a lot - t.add(7, payload_size, 'R', 'Q'); - t.add(8, payload_size, 'S', 'R'); - t.add(9, payload_size, 'T', 'S'); + t.add(9, payload_size, 'T', 'M'); t.add(10, payload_size, 'U', 'T'); t.add(11, payload_size, 'V', 'U'); t.add(12, payload_size, 'W', 'V'); @@ -226,7 +257,7 @@ BOOST_DATA_TEST_CASE(basic_prune_test, bdata::xrange(2) * bdata::xrange(2) * bda //start from genesis not allowed BOOST_REQUIRE_EXCEPTION(t.add(2, payload_size, 'A', 'A');, eosio::chain::plugin_exception, [](const eosio::chain::plugin_exception& e) { std::string err = e.to_detail_string(); - return err.find("Existing ship log") != std::string::npos && err.find("when starting from genesis block") != std::string::npos; + return err.find("existing ship log") != std::string::npos && err.find("when starting from genesis block") != std::string::npos; }); } FC_LOG_AND_RETHROW() } @@ -274,12 +305,12 @@ BOOST_AUTO_TEST_CASE(empty) { try { fc::temp_directory log_dir; { - eosio::state_history_log log("empty", log_dir.path()); + eosio::state_history::state_history_log log(log_dir.path()/ "empty"); BOOST_REQUIRE(log.empty()); } //reopen { - eosio::state_history_log log("empty", log_dir.path()); + eosio::state_history::state_history_log log(log_dir.path() / "empty"); BOOST_REQUIRE(log.empty()); } //reopen but prunned set @@ -287,20 +318,20 @@ BOOST_AUTO_TEST_CASE(empty) { try { .prune_blocks = 4 }; { - eosio::state_history_log log("empty", log_dir.path(), simple_prune_conf); + eosio::state_history::state_history_log log(log_dir.path() / "empty", state_history::state_history_log::no_non_local_get_block_id_func, simple_prune_conf); BOOST_REQUIRE(log.empty()); } { - eosio::state_history_log log("empty", log_dir.path(), simple_prune_conf); + eosio::state_history::state_history_log log(log_dir.path() / "empty", state_history::state_history_log::no_non_local_get_block_id_func, simple_prune_conf); BOOST_REQUIRE(log.empty()); } //back to non pruned { - eosio::state_history_log log("empty", log_dir.path()); + eosio::state_history::state_history_log log(log_dir.path() / "empty"); BOOST_REQUIRE(log.empty()); } { - eosio::state_history_log log("empty", log_dir.path()); + eosio::state_history::state_history_log log(log_dir.path() / "empty"); BOOST_REQUIRE(log.empty()); } @@ -312,7 +343,7 @@ BOOST_AUTO_TEST_CASE(empty) { try { //one more time to pruned, just to make sure { - eosio::state_history_log log("empty", log_dir.path(), simple_prune_conf); + eosio::state_history::state_history_log log(log_dir.path()/ "empty", state_history::state_history_log::no_non_local_get_block_id_func, simple_prune_conf); BOOST_REQUIRE(log.empty()); } BOOST_REQUIRE(std::filesystem::file_size(log_file.c_str()) == 0); @@ -431,5 +462,655 @@ BOOST_DATA_TEST_CASE(prune_to_partitioned, bdata::xrange(2) * bdata::xrange(2), } FC_LOG_AND_RETHROW() } +BOOST_DATA_TEST_CASE(basic, bdata::make({2u, 333u, 578'000u, 3'123'456'789u}) ^ bdata::make({102u, 400u, 578'111u, 3'123'456'900u}), start, end) try { + const fc::temp_directory tmpdir; + + eosio::state_history::log_catalog lc(tmpdir.path(), std::monostate(), "testlog"); + BOOST_REQUIRE(lc.empty()); + + std::map wrote_data_for_blocknum; + std::mt19937 mt_random(0xbeefbeefu * start); + + //write some blocks in order + for(unsigned i = start; i < end; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%16*1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + //pick some random blocks and read their content back; make sure it matches. + for(unsigned i = start; i < end; i+=mt_random()%10) { + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!!entry); + + std::optional bid = lc.get_block_id(i); + BOOST_REQUIRE(!!bid); + BOOST_REQUIRE_EQUAL(*bid, fake_blockid_for_num(i)); + + bio::filtering_ostreambuf hashed_null(sha256_filter() | bio::null_sink()); + bio::filtering_istreambuf log_stream = entry->get_stream(); + bio::copy(log_stream, hashed_null); + BOOST_REQUIRE_EQUAL(hashed_null.component(0)->enc->result(), wrote_data_for_blocknum[i]); + } + + //pick some blocks outside the range of blocks we wrote and make sure we cannot read them + for(const unsigned i : {1u, start-34, start-1, end, end+56, end+1004}) { //start-34 might roll over; no big deal + std::optional bid = lc.get_block_id(i); + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!bid); + BOOST_REQUIRE(!entry); + } + + //"end" would be the next block to be appended + //attempt to "skip" a block + { + unsigned skipped_block = end + 1; + BOOST_REQUIRE_EXCEPTION(lc.pack_and_write_entry(fake_blockid_for_num(skipped_block), fake_blockid_for_num(skipped_block-1), [&](bio::filtering_ostreambuf& obuf) { + FC_ASSERT(false, "should not reach here"); + }), + plugin_exception, + [](const plugin_exception& e) {return e.to_detail_string().find("skips over block") != std::string::npos;}); + } + + //now let's try appending a block that doesn't have the right previous block id + { + BOOST_REQUIRE_EXCEPTION(lc.pack_and_write_entry(fake_blockid_for_num(end), fake_blockid_for_num(end-1, 0xbeefUL), [&](bio::filtering_ostreambuf& obuf) { + FC_ASSERT(false, "should not reach here"); + }), + plugin_exception, + [](const plugin_exception& e) {return e.to_detail_string().find("missed a fork change") != std::string::npos;}); + } + + //now we're going to try writing identical blockids to the log. These should be silently swallowed as no-ops + for(unsigned i : {2u, start, start+6, end-5, end-1}) { + //but block 2 is special. If writing block 2 on a non empty log we fail as a safety precaution + if(i == 2u) + BOOST_REQUIRE_EXCEPTION(lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + FC_ASSERT(false, "should not reach here"); + }), + plugin_exception, + [](const plugin_exception& e) {return e.to_detail_string().find("when starting from genesis block 2") != std::string::npos;}); + else + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + FC_ASSERT(false, "should not reach here"); + }); + } + + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + //time for a "fork": we're going to rewrite the last 4 blocks and add 2 new ones as well. But we're going to ensure that old data remains intact during this + //"overwrite" (as long as the ship_log_entry is alive) + std::array, sha256>, 4> pre_fork_entries_and_expected_hashes = { + std::make_pair(lc.get_entry(end-1), wrote_data_for_blocknum[end-1]), + std::make_pair(lc.get_entry(end-3), wrote_data_for_blocknum[end-3]), //out of order for fun + std::make_pair(lc.get_entry(end-2), wrote_data_for_blocknum[end-2]), + std::make_pair(lc.get_entry(end-4), wrote_data_for_blocknum[end-4]), + }; + + lc.pack_and_write_entry(fake_blockid_for_num(end-4, 0xdeadUL), fake_blockid_for_num(end-4-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%16*1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[end-4] = hashed_randomness.component(0)->enc->result(); + }); + for(const unsigned i : {end-3, end-2, end-1, end, end+1}) + lc.pack_and_write_entry(fake_blockid_for_num(i, 0xdeadUL), fake_blockid_for_num(i-1, 0xdeadUL), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%16*1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + + //first, check that the pre-fork entries still read their pre-fork data + ///XXX can we const this please? + for(std::pair, sha256>& prefork_entry : pre_fork_entries_and_expected_hashes) { + BOOST_REQUIRE(!!prefork_entry.first); + bio::filtering_ostreambuf hashed_null(sha256_filter() | bio::null_sink()); + bio::filtering_istreambuf log_stream = prefork_entry.first->get_stream(); + bio::copy(log_stream, hashed_null); + BOOST_REQUIRE_EQUAL(hashed_null.component(0)->enc->result(), prefork_entry.second); + } + //now let's check all of the just added blocks; and a couple earlier ones + for(unsigned i : {end-6, end-5, /*"new fork" blocks:*/end-4, end-3, end-2, end-1, end, end+1}) { + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!!entry); + + bio::filtering_ostreambuf hashed_null(sha256_filter() | bio::null_sink()); + bio::filtering_istreambuf log_stream = entry->get_stream(); + bio::copy(log_stream, hashed_null); + BOOST_REQUIRE_EQUAL(hashed_null.component(0)->enc->result(), wrote_data_for_blocknum[i]); + } + + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end+2); + +} FC_LOG_AND_RETHROW(); + + +BOOST_AUTO_TEST_CASE(regen_index) try { + const fc::temp_directory tmpdir; + + //try recreating the index for an empty log + { + eosio::state_history::log_catalog lc(tmpdir.path(), std::monostate(), "empty"); + BOOST_REQUIRE(lc.empty()); + } + BOOST_REQUIRE(std::filesystem::exists(tmpdir.path() / "empty.index")); + std::filesystem::remove(tmpdir.path() / "empty.index"); + BOOST_REQUIRE(!std::filesystem::exists(tmpdir.path() / "empty.index")); + { + eosio::state_history::log_catalog lc(tmpdir.path(), std::monostate(), "empty"); + BOOST_REQUIRE(lc.empty()); + } + + //fill up a log with a handful of blocks + { + eosio::state_history::log_catalog lc(tmpdir.path(), std::monostate(), "newlog"); + BOOST_REQUIRE(lc.empty()); + + for(unsigned i = 2; i < 34; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + { + fc::datastream ds(obuf); + fc::raw::pack(ds, i); + } + bio::copy(bio::restrict(random_source(), 0, 77777), obuf); + }); + + BOOST_REQUIRE_EQUAL(lc.block_range().first, 2u); + BOOST_REQUIRE_EQUAL(lc.block_range().second, 34u); + } + BOOST_REQUIRE(std::filesystem::exists(tmpdir.path() / "newlog.index")); + const uintmax_t prev_index_size = std::filesystem::file_size(tmpdir.path() / "newlog.index"); + std::string old_index_contents; + read_file_contents(tmpdir.path() / "newlog.index", old_index_contents); + BOOST_REQUIRE_EQUAL(prev_index_size, old_index_contents.size()); + + //now remove the index and make sure the recreated index works + std::filesystem::remove(tmpdir.path() / "newlog.index"); + BOOST_REQUIRE(!std::filesystem::exists(tmpdir.path() / "newlog.index")); + { + eosio::state_history::log_catalog lc(tmpdir.path(), std::monostate(), "newlog"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, 2u); + BOOST_REQUIRE_EQUAL(lc.block_range().second, 34u); + + //read some blocks back + for(const unsigned i : {2u, 10u, 22u, 33u}) { + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!!entry); + bio::filtering_istreambuf log_stream = entry->get_stream(); + fc::datastream ds(log_stream); + unsigned red; + fc::raw::unpack(ds, red); + BOOST_REQUIRE_EQUAL(red, i); + } + } + + //also compare the index contents; should be exactly the same + std::string new_index_contents; + read_file_contents(tmpdir.path() / "newlog.index", new_index_contents); + BOOST_REQUIRE_EQUAL(new_index_contents.size(), old_index_contents.size()); + BOOST_REQUIRE_EQUAL(new_index_contents, old_index_contents); +} FC_LOG_AND_RETHROW(); + +BOOST_AUTO_TEST_CASE(empty_empty_empty) try { + //just opens and closes an empty log a few times + const fc::temp_directory tmpdir; + + for(unsigned i = 0; i < 4; ++i) { + eosio::state_history::log_catalog lc(tmpdir.path(), std::monostate(), "empty"); + BOOST_REQUIRE(lc.empty()); + } + BOOST_REQUIRE(std::filesystem::exists(tmpdir.path() / "empty.log")); + BOOST_REQUIRE(std::filesystem::exists(tmpdir.path() / "empty.index")); + BOOST_REQUIRE_EQUAL(std::filesystem::file_size(tmpdir.path() / "empty.log"), 0u); + BOOST_REQUIRE_EQUAL(std::filesystem::file_size(tmpdir.path() / "empty.index"), 0u); +} FC_LOG_AND_RETHROW(); + +BOOST_DATA_TEST_CASE(basic_split, boost::unit_test::data::make({5u, 6u, 7u, 8u, 9u, 10u, 578'000u, 3'123'456'789u}) * + boost::unit_test::data::make({5u, 10u}) * + boost::unit_test::data::make({"保留", ""}) + , start, stride, retained_dir) try { + const fc::temp_directory tmpdir; + + state_history::partition_config conf = { + .retained_dir = retained_dir, + .archive_dir = "档案", + .stride = stride, + .max_retained_files = UINT32_MAX + }; + + const unsigned initial_blocks_to_append = 50; + unsigned end = start+initial_blocks_to_append+1; + std::map wrote_data_for_blocknum; + + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE(lc.empty()); + + std::mt19937 mt_random(0xbeefbeefu * start); + + for(unsigned i = start; i < end; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + const unsigned expected_log_parts = initial_blocks_to_append/stride + (start%stride == 0); + + for(const std::string& suffix : {"log"s, "index"s}) { + const std::regex retained_logfile_regex(R"(^splitit-\d+-\d+\.)" + suffix + "$"); + + unsigned found = 0; + for(const std::filesystem::directory_entry& dir_entry : std::filesystem::directory_iterator(tmpdir.path() / conf.retained_dir)) + found += std::regex_search(dir_entry.path().filename().string(), retained_logfile_regex); + BOOST_REQUIRE_EQUAL(found, expected_log_parts); + BOOST_REQUIRE(std::filesystem::exists(tmpdir.path() / ("splitit."+suffix))); + } + + //load the catalog back up and read through all the blocks + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + for(unsigned i = start; i < end; i++) { + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!!entry); + + std::optional bid = lc.get_block_id(i); + BOOST_REQUIRE(!!bid); + BOOST_REQUIRE_EQUAL(*bid, fake_blockid_for_num(i)); + + bio::filtering_ostreambuf hashed_null(sha256_filter() | bio::null_sink()); + bio::filtering_istreambuf log_stream = entry->get_stream(); + bio::copy(log_stream, hashed_null); + BOOST_REQUIRE_EQUAL(hashed_null.component(0)->enc->result(), wrote_data_for_blocknum[i]); + } + } + + //find a log & index file and copy it to a name that does not match expected; it should be silently ignored + { + const std::regex regex(R"(^splitit-\d+-\d+\.log$)"); + + for(const std::filesystem::directory_entry& dir_entry : std::filesystem::directory_iterator(tmpdir.path() / conf.retained_dir)) + if(std::regex_search(dir_entry.path().filename().string(), regex)) { + std::filesystem::copy_file(dir_entry.path(), std::filesystem::path(dir_entry.path()).replace_filename("yeeeeehaw-1234.log")); + std::filesystem::copy_file(std::filesystem::path(dir_entry.path()).replace_extension("index"), std::filesystem::path(dir_entry.path()).replace_filename("yeeeeehaw-1234.index")); + break; + } + } + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + //delete every other .index file. indexes will be recreated for all log parts on construction of the ship_log_catalog + { + const std::regex regex(R"(^splitit-\d+-\d+\.index)"); + + bool do_this_one = false; + for(const std::filesystem::directory_entry& dir_entry : std::filesystem::directory_iterator(tmpdir.path() / conf.retained_dir)) + if(std::regex_search(dir_entry.path().filename().string(), regex)) { + if(do_this_one) + std::filesystem::remove(dir_entry.path()); + do_this_one = !do_this_one; + } + } + //and we'll go through the process of reading all blocks after the indexes have been recreated + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + for(unsigned i = start; i < end; i++) { + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!!entry); + + std::optional bid = lc.get_block_id(i); + BOOST_REQUIRE(!!bid); + BOOST_REQUIRE_EQUAL(*bid, fake_blockid_for_num(i)); + + bio::filtering_ostreambuf hashed_null(sha256_filter() | bio::null_sink()); + bio::filtering_istreambuf log_stream = entry->get_stream(); + bio::copy(log_stream, hashed_null); + BOOST_REQUIRE_EQUAL(hashed_null.component(0)->enc->result(), wrote_data_for_blocknum[i]); + } + } + + //now switch over to no splitting. this is allowed but old split logs will not be "visible" when configured this way + { + eosio::state_history::log_catalog lc(tmpdir.path(), std::monostate(), "splitit"); + if(start % conf.stride == 0) { //"head log" will be empty in this case + BOOST_REQUIRE(lc.empty()); + } + else { + BOOST_REQUIRE(lc.block_range().first % conf.stride == 1); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + //let's go create another 100 blocks too! + std::mt19937 mt_random(0xbeefbeefu * end); + const unsigned new_end = end + 100; + + for(unsigned i = end; i < new_end; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + + end = new_end; + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + //and back to split log mode. all those retained logs will be visible again + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + //but now let's add enough blocks to trigger a rotation again. This will give us a retained log that is a different span + // size than all the previous spans + std::mt19937 mt_random(0xbeefbeefu * end); + for(unsigned i = end; i < end+conf.stride; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + end += conf.stride; + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + //one more time where we read through everything + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + for(unsigned i = start; i < end; i++) { + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!!entry); + + std::optional bid = lc.get_block_id(i); + BOOST_REQUIRE(!!bid); + BOOST_REQUIRE_EQUAL(*bid, fake_blockid_for_num(i)); + + bio::filtering_ostreambuf hashed_null(sha256_filter() | bio::null_sink()); + bio::filtering_istreambuf log_stream = entry->get_stream(); + bio::copy(log_stream, hashed_null); + BOOST_REQUIRE_EQUAL(hashed_null.component(0)->enc->result(), wrote_data_for_blocknum[i]); + } + } + + //set the number of retained logs to 4 + conf.max_retained_files = 4u; + //and go generate enough blocks to cause a rotation which will move old logs to the archive directory + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + //but now let's add enough blocks to trigger a rotation again. This will give us a retained log that is a different span + // size than all the previous spans + std::mt19937 mt_random(0xbeefbeefu * end); + for(unsigned i = end; i < end+conf.stride; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + end += conf.stride; + + BOOST_REQUIRE_NE(lc.block_range().first, 2u); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + //make sure we have only 4 retained logs in the retained dir; and note how many in archived dir + std::unordered_map last_archive_dir_count; + { + for(const std::string& suffix : {"log"s, "index"s}) { + const std::regex retained_logfile_regex(R"(^splitit-\d+-\d+\.)" + suffix + "$"); + + const unsigned found = std::ranges::count_if(std::filesystem::directory_iterator(tmpdir.path() / conf.retained_dir), [&](const std::filesystem::directory_entry& dir_entry) { + return std::regex_search(dir_entry.path().filename().string(), retained_logfile_regex); + }); + BOOST_REQUIRE_EQUAL(found, 4u); + } + for(const std::string& suffix : {"log"s, "index"s}) { + const std::regex retained_logfile_regex(R"(^splitit-\d+-\d+\.)" + suffix + "$"); + + last_archive_dir_count[suffix] = std::ranges::count_if(std::filesystem::directory_iterator(tmpdir.path() / conf.archive_dir), [&](const std::filesystem::directory_entry& dir_entry) { + return std::regex_search(dir_entry.path().filename().string(), retained_logfile_regex); + }); + BOOST_REQUIRE_NE(last_archive_dir_count[suffix], 0u); + } + BOOST_REQUIRE_EQUAL(last_archive_dir_count["log"], last_archive_dir_count["index"]); + } + + //clear the archive directory. This will cause logs to be removed; reduce max_retain to 3 to make it easier to spot proper behavior + const std::filesystem::path previous_archive_dir = conf.archive_dir; + conf.archive_dir.clear(); + conf.max_retained_files = 3u; + //generate enough blocks for a rotation... + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + std::mt19937 mt_random(0xbeefbeefu * end); + for(unsigned i = end; i < end+conf.stride; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + end += conf.stride; + + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + //check filesystem.. + { + //should only find 3 logs in retain dir + for(const std::string& suffix : {"log"s, "index"s}) { + const std::regex retained_logfile_regex(R"(^splitit-\d+-\d+\.)" + suffix + "$"); + + const unsigned found = std::ranges::count_if(std::filesystem::directory_iterator(tmpdir.path() / conf.retained_dir), [&](const std::filesystem::directory_entry& dir_entry) { + return std::regex_search(dir_entry.path().filename().string(), retained_logfile_regex); + }); + BOOST_REQUIRE_EQUAL(found, 3u); + } + //archive dir should still have same number of files + for(const std::string& suffix : {"log"s, "index"s}) { + const std::regex retained_logfile_regex(R"(^splitit-\d+-\d+\.)" + suffix + "$"); + + const unsigned found = std::ranges::count_if(std::filesystem::directory_iterator(tmpdir.path() / previous_archive_dir), [&](const std::filesystem::directory_entry& dir_entry) { + return std::regex_search(dir_entry.path().filename().string(), retained_logfile_regex); + }); + BOOST_REQUIRE_EQUAL(found, last_archive_dir_count[suffix]); + } + } + + //one more pass through all the blocks + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "splitit"); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + for(unsigned i = lc.block_range().first; i < end; i++) { + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!!entry); + + std::optional bid = lc.get_block_id(i); + BOOST_REQUIRE(!!bid); + BOOST_REQUIRE_EQUAL(*bid, fake_blockid_for_num(i)); + + bio::filtering_ostreambuf hashed_null(sha256_filter() | bio::null_sink()); + bio::filtering_istreambuf log_stream = entry->get_stream(); + bio::copy(log_stream, hashed_null); + BOOST_REQUIRE_EQUAL(hashed_null.component(0)->enc->result(), wrote_data_for_blocknum[i]); + } + } + + //remove one of the reained logs causing a "hole" which is disallowed. to do this reliably and for full coverage, we'll first delete the + // second newest retained log, and then delete the newest retained log + std::map found; + { + const std::regex retained_logfile_regex(R"(^splitit-\d+-\d+\.log$)"); + + for(const std::filesystem::directory_entry& dir_entry : std::filesystem::directory_iterator(tmpdir.path() / conf.retained_dir)) { + if(!std::regex_search(dir_entry.path().filename().string(), retained_logfile_regex)) + continue; + found[state_history::state_history_log(std::filesystem::path(dir_entry.path()).replace_extension("")).block_range().second] = dir_entry.path(); + } + BOOST_REQUIRE_GT(found.size(), 1u); + } + std::filesystem::remove(std::next(found.rbegin())->second); + BOOST_REQUIRE_EXCEPTION(eosio::state_history::log_catalog(tmpdir.path(), conf, "splitit"), + plugin_exception, + [](const plugin_exception& e) {return e.to_detail_string().find("which results in a hole") != std::string::npos;}); + std::filesystem::remove(found.rbegin()->second); + + //only perform this check if we expect the "head log" to be non-empty + if(start % conf.stride) + BOOST_REQUIRE_EXCEPTION(eosio::state_history::log_catalog(tmpdir.path(), conf, "splitit"), + plugin_exception, + [](const plugin_exception& e) {return e.to_detail_string().find("which results in a hole") != std::string::npos;}); + //unfortuately if the "head log" _is_ empty we're in quite a problem since we won't be able to detect the hole until a block is appended + //TODO: *is* the above checked? +} FC_LOG_AND_RETHROW(); + +BOOST_AUTO_TEST_CASE(split_forks) try { + const fc::temp_directory tmpdir; + + state_history::partition_config conf = { + .retained_dir = "retained", + .archive_dir = {}, + .stride = 10, + .max_retained_files = UINT32_MAX + }; + + //fill up 50 blocks + const unsigned start = 2; + const unsigned end = 53; + std::map wrote_data_for_blocknum; + std::mt19937 mt_random(0xbeefbeefu * start); + + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "logz"); + BOOST_REQUIRE(lc.empty()); + + for(unsigned i = start; i < end; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i), fake_blockid_for_num(i-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + //discover the filenames for: + // head + // 41-50 + // 31-40 + const std::filesystem::path head_log_path = tmpdir.path() / "logz"; + std::filesystem::path path_31to40; + std::filesystem::path path_41to50; + { + const std::regex retained_logfile_regex(R"(^logz-\d+-\d+\.log$)"); + + for(const std::filesystem::directory_entry& dir_entry : std::filesystem::directory_iterator(tmpdir.path() / conf.retained_dir)) { + if(!std::regex_search(dir_entry.path().filename().string(), retained_logfile_regex)) + continue; + const std::filesystem::path path_no_ext = std::filesystem::path(dir_entry.path()).replace_extension(""); + const unsigned start_block = state_history::state_history_log(path_no_ext).block_range().first; + if(start_block == 31) + path_31to40 = dir_entry.path(); + else if(start_block == 41) + path_41to50 = dir_entry.path(); + } + BOOST_REQUIRE(!path_31to40.empty() && !path_41to50.empty()); + } + + const size_t before_head_log_size = std::filesystem::file_size(std::filesystem::path(head_log_path).replace_extension("log")); + const size_t before_head_index_size = std::filesystem::file_size(std::filesystem::path(head_log_path).replace_extension("index")); + const size_t before_31to40_log_size = std::filesystem::file_size(std::filesystem::path(path_31to40).replace_extension("log")); + const size_t before_31to40_index_size = std::filesystem::file_size(std::filesystem::path(path_31to40).replace_extension("index")); + const size_t before_41to50_log_size = std::filesystem::file_size(std::filesystem::path(path_41to50).replace_extension("log")); + const size_t before_41to50_index_size = std::filesystem::file_size(std::filesystem::path(path_41to50).replace_extension("index")); + + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "logz"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + //start a fork at block 37 + const unsigned bnum = 37; + lc.pack_and_write_entry(fake_blockid_for_num(bnum, 0xdeadUL), fake_blockid_for_num(bnum-1), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[bnum] = hashed_randomness.component(0)->enc->result(); + }); + + //check sizes of everything: all index sizes should remain equal; 31to40 log must have grown + BOOST_REQUIRE_EQUAL(before_head_index_size, std::filesystem::file_size(std::filesystem::path(head_log_path).replace_extension("index"))); + BOOST_REQUIRE_EQUAL(before_31to40_index_size, std::filesystem::file_size(std::filesystem::path(path_31to40).replace_extension("index"))); + BOOST_REQUIRE_EQUAL(before_41to50_index_size, std::filesystem::file_size(std::filesystem::path(path_41to50).replace_extension("index"))); + BOOST_REQUIRE_EQUAL(before_head_log_size, std::filesystem::file_size(std::filesystem::path(head_log_path).replace_extension("log"))); + BOOST_REQUIRE_EQUAL(before_41to50_log_size, std::filesystem::file_size(std::filesystem::path(path_41to50).replace_extension("log"))); + BOOST_REQUIRE_LT(before_31to40_log_size, std::filesystem::file_size(std::filesystem::path(path_31to40).replace_extension("log"))); + + //continue on with blocks 38 through 52 + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + for(unsigned i = 38; i < 52+1; ++i) + lc.pack_and_write_entry(fake_blockid_for_num(i, 0xdeadUL), fake_blockid_for_num(i-1, 0xdeadUL), [&](bio::filtering_ostreambuf& obuf) { + bio::filtering_istreambuf hashed_randomness(sha256_filter() | bio::restrict(random_source(), 0, mt_random()%1024*1024)); + bio::copy(hashed_randomness, obuf); + wrote_data_for_blocknum[i] = hashed_randomness.component(0)->enc->result(); + }); + + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + } + + //check sizes of everything: all index sizes should remain equal; logs would have grown + BOOST_REQUIRE_EQUAL(before_head_index_size, std::filesystem::file_size(std::filesystem::path(head_log_path).replace_extension("index"))); + BOOST_REQUIRE_EQUAL(before_31to40_index_size, std::filesystem::file_size(std::filesystem::path(path_31to40).replace_extension("index"))); + BOOST_REQUIRE_EQUAL(before_41to50_index_size, std::filesystem::file_size(std::filesystem::path(path_41to50).replace_extension("index"))); + BOOST_REQUIRE_LT(before_head_log_size, std::filesystem::file_size(std::filesystem::path(head_log_path).replace_extension("log"))); + BOOST_REQUIRE_LT(before_41to50_log_size, std::filesystem::file_size(std::filesystem::path(path_41to50).replace_extension("log"))); + BOOST_REQUIRE_LT(before_31to40_log_size, std::filesystem::file_size(std::filesystem::path(path_31to40).replace_extension("log"))); + + //read through all the blocks and validate contents + { + eosio::state_history::log_catalog lc(tmpdir.path(), conf, "logz"); + BOOST_REQUIRE_EQUAL(lc.block_range().first, start); + BOOST_REQUIRE_EQUAL(lc.block_range().second, end); + + for(unsigned i = start; i < end; ++i) { + std::optional entry = lc.get_entry(i); + BOOST_REQUIRE(!!entry); + + bio::filtering_ostreambuf hashed_null(sha256_filter() | bio::null_sink()); + bio::filtering_istreambuf log_stream = entry->get_stream(); + bio::copy(log_stream, hashed_null); + BOOST_REQUIRE_EQUAL(hashed_null.component(0)->enc->result(), wrote_data_for_blocknum[i]); + } + } +} FC_LOG_AND_RETHROW(); BOOST_AUTO_TEST_SUITE_END() diff --git a/unittests/state_history_tests.cpp b/unittests/state_history_tests.cpp index 847d06fe79..8b85cab670 100644 --- a/unittests/state_history_tests.cpp +++ b/unittests/state_history_tests.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -613,20 +613,19 @@ BOOST_AUTO_TEST_CASE(test_deltas_resources_history) { BOOST_CHECK(std::any_of(partial_txns.begin(), partial_txns.end(), contains_transaction_extensions)); } - struct state_history_tester_logs { - state_history_tester_logs(const std::filesystem::path& dir, const eosio::state_history_log_config& config) - : traces_log("trace_history",dir, config) , chain_state_log("chain_state_history", dir, config) {} + state_history_tester_logs(const std::filesystem::path& dir, const eosio::state_history::state_history_log_config& config) + : traces_log(dir, config, "trace_history") , chain_state_log(dir, config, "chain_state_history") {} - eosio::state_history_log traces_log; - eosio::state_history_log chain_state_log; + eosio::state_history::log_catalog traces_log; + eosio::state_history::log_catalog chain_state_log; eosio::state_history::trace_converter trace_converter; }; struct state_history_tester : state_history_tester_logs, legacy_tester { - state_history_tester(const std::filesystem::path& dir, const eosio::state_history_log_config& config) + state_history_tester(const std::filesystem::path& dir, const eosio::state_history::state_history_log_config& config) : state_history_tester_logs(dir, config), legacy_tester ([this](eosio::chain::controller& control) { control.applied_transaction().connect( [&](std::tuple t) { @@ -635,15 +634,12 @@ struct state_history_tester : state_history_tester_logs, legacy_tester { control.accepted_block().connect([&](block_signal_params t) { const auto& [ block, id ] = t; - eosio::state_history_log_header header{.magic = eosio::ship_magic(eosio::ship_current_version, 0), - .block_id = id, - .payload_size = 0}; - traces_log.pack_and_write_entry(header, block->previous, [this, &block](auto&& buf) { + traces_log.pack_and_write_entry(id, block->previous, [this, &block](auto&& buf) { trace_converter.pack(buf, false, block); }); - chain_state_log.pack_and_write_entry(header, block->previous, [&control](auto&& buf) { + chain_state_log.pack_and_write_entry(id, block->previous, [&control](auto&& buf) { eosio::state_history::pack_deltas(buf, control.db(), true); }); }); @@ -654,22 +650,19 @@ struct state_history_tester : state_history_tester_logs, legacy_tester { }) {} }; -static std::vector get_decompressed_entry(eosio::state_history_log& log, block_num_type block_num) { - auto result = log.create_locked_decompress_stream(); - log.get_unpacked_entry(block_num, result); +static std::vector get_decompressed_entry(eosio::state_history::log_catalog& log, block_num_type block_num) { + std::optional entry = log.get_entry(block_num); + if(!entry) //existing tests expect failure to find a block returns an empty vector here + return {}; + namespace bio = boost::iostreams; - return std::visit(eosio::chain::overloaded{ [](std::vector& bytes) { - return bytes; - }, - [](std::unique_ptr& strm) { - std::vector bytes; - bio::copy(*strm, bio::back_inserter(bytes)); - return bytes; - } }, - result.buf); + bio::filtering_istreambuf istream = entry->get_stream(); + std::vector bytes; + bio::copy(istream, bio::back_inserter(bytes)); + return bytes; } -static std::vector get_traces(eosio::state_history_log& log, +static std::vector get_traces(eosio::state_history::log_catalog& log, block_num_type block_num) { auto entry = get_decompressed_entry(log, block_num); std::vector traces;