diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index efd241bbaa..d2bc1645a8 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -1703,8 +1703,6 @@ struct controller_impl { } void replay(startup_t startup) { - replaying = true; - bool replay_block_log_needed = should_replay_block_log(); auto blog_head = blog.head(); @@ -1830,8 +1828,9 @@ struct controller_impl { // loading from snapshot without a block log so fork_db can't be considered valid fork_db_reset_root_to_chain_head(); } else if( !except_ptr && !check_shutdown() && !irreversible_mode() && fork_db.head()) { - // applies all blocks up to fork_db head from fork_db - maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}); + // applies all blocks up to fork_db head from fork_db, shouldn't return incomplete, but if it does loop until complete + while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete) + ; auto head = fork_db.head(); ilog( "reversible blocks replayed to ${bn} : ${id}", ("bn", head->block_num())("id", head->id()) ); } @@ -1842,8 +1841,6 @@ struct controller_impl { }; fork_db_.apply(replay_fork_db); - replaying = false; - if( except_ptr ) { std::rethrow_exception( except_ptr ); } @@ -1992,6 +1989,7 @@ struct controller_impl { ilog( "chain database started with hash: ${hash}", ("hash", calculate_integrity_hash()) ); okay_to_print_integrity_hash_on_stop = true; + fc::scoped_set_value r(replaying, true); replay( startup ); // replay any irreversible and reversible blocks ahead of current head if( check_shutdown() ) return; @@ -2013,7 +2011,9 @@ struct controller_impl { // See comment below about pause-at-block for why `|| conf.num_configured_p2p_peers > 0` if (chain_head_is_root || conf.num_configured_p2p_peers > 0) { ilog("applying branch from fork database ending with block: ${id}", ("id", pending_head->id())); - maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}); + // applies all blocks up to forkdb head from forkdb, shouldn't return incomplete, but if it does loop until complete + while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete) + ; } } } else { @@ -4301,23 +4301,25 @@ struct controller_impl { } FC_LOG_AND_RETHROW( ) } - void apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) { + controller::apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) { try { if( !irreversible_mode() ) { - maybe_apply_blocks( cb, trx_lookup ); - } else { - log_irreversible(); - transition_to_savanna_if_needed(); + return maybe_apply_blocks( cb, trx_lookup ); } + + log_irreversible(); + transition_to_savanna_if_needed(); + return controller::apply_blocks_result::complete; } FC_LOG_AND_RETHROW( ) } - void maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup ) + controller::apply_blocks_result maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup ) { + controller::apply_blocks_result result = controller::apply_blocks_result::complete; auto do_apply_blocks = [&](auto& fork_db) { auto new_head = fork_db.head(); // use best head if (!new_head) - return; // nothing to do, fork_db at root + return;// nothing to do, fork_db at root auto [new_head_branch, old_head_branch] = fork_db.fetch_branch_from( new_head->id(), chain_head.id() ); bool switch_fork = !old_head_branch.empty(); @@ -4356,15 +4358,24 @@ struct controller_impl { } } + auto start = fc::time_point::now(); for( auto ritr = new_head_branch.rbegin(); ritr != new_head_branch.rend(); ++ritr ) { auto except = std::exception_ptr{}; const auto& bsp = *ritr; try { bool applied = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated : controller::block_status::complete, trx_lookup ); - if (!switch_fork && (!applied || check_shutdown())) { - shutdown(); - break; + if (!switch_fork) { // always complete a switch fork + if (!applied || check_shutdown()) { + shutdown(); + break; // result should be complete since we are shutting down + } + // Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected + // to call apply_blocks again if this returns incomplete. + if (!replaying && fc::time_point::now() - start > fc::milliseconds(500)) { + result = controller::apply_blocks_result::incomplete; + break; + } } } catch ( const std::bad_alloc& ) { throw; @@ -4421,6 +4432,8 @@ struct controller_impl { }; fork_db_.apply(do_apply_blocks); + + return result; } deque abort_block() { @@ -5183,9 +5196,9 @@ void controller::set_async_aggregation(async_t val) { my->async_aggregation = val; } -void controller::apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) { +controller::apply_blocks_result controller::apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) { validate_db_available_size(); - my->apply_blocks(cb, trx_lookup); + return my->apply_blocks(cb, trx_lookup); } diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index 28c2f1f422..378718558c 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -230,9 +230,6 @@ namespace eosio::chain { void set_async_voting(async_t val); void set_async_aggregation(async_t val); - /// Apply any blocks that are ready from the fork_db - void apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup); - struct accepted_block_result { const bool is_new_best_head = false; // true if new best head std::optional block; // empty optional if block is unlinkable @@ -240,6 +237,13 @@ namespace eosio::chain { // thread-safe accepted_block_result accept_block( const block_id_type& id, const signed_block_ptr& b ) const; + /// Apply any blocks that are ready from the fork_db + enum class apply_blocks_result { + complete, // all ready blocks in forkdb have been applied + incomplete // time limit reached, additional blocks may be available in forkdb to process + }; + apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup); + boost::asio::io_context& get_thread_pool(); const chainbase::database& db()const; diff --git a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp index bf75866b87..a6da2398f6 100644 --- a/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp +++ b/plugins/chain_interface/include/eosio/chain/plugin_interface.hpp @@ -28,7 +28,7 @@ namespace eosio::chain::plugin_interface { namespace incoming { namespace methods { // synchronously push a block/trx to a single provider, block_state_legacy_ptr may be null - using block_sync = method_decl; + using block_sync = method_decl; using transaction_async = method_decl), first_provider_policy>; } } diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 974724fbb0..2cb613aa37 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3776,11 +3776,21 @@ namespace eosio { if (best_head) { ++c->unique_blocks_rcvd_count; fc_dlog(logger, "posting incoming_block to app thread, block ${n}", ("n", ptr->block_num())); + + auto process_incoming_blocks = [](auto self) -> void { + try { + auto r = my_impl->producer_plug->on_incoming_block(); + if (r == controller::apply_blocks_result::incomplete) { + app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, [self]() { + self(self); + }); + } + } catch (...) {} // errors on applied blocks logged in controller + }; + app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, - []() { - try { - my_impl->producer_plug->on_incoming_block(); - } catch (...) {} // errors on applied blocks logged in controller + [process_incoming_blocks]() { + process_incoming_blocks(process_incoming_blocks); }); // ready to process immediately, so signal producer to interrupt start_block diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp index b85cb99c1d..fa4f235684 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp @@ -81,7 +81,7 @@ class producer_plugin : public appbase::plugin { virtual void plugin_shutdown(); void handle_sighup() override; - bool on_incoming_block(); + controller::apply_blocks_result on_incoming_block(); void pause(); void resume(); diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 14427df4b2..e0236fcc88 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -892,7 +892,7 @@ class producer_plugin_impl : public std::enable_shared_from_thison_incoming_block(); } @@ -1992,8 +1995,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { abort_block(); - chain.apply_blocks([this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); }, - [this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); }); + auto r = chain.apply_blocks([this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); }, + [this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); }); + if (r != controller::apply_blocks_result::complete) + return start_block_result::failed; if (chain.should_terminate()) { app().quit();