Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit apply blocks #922

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1703,8 +1703,6 @@ struct controller_impl {
}

void replay(startup_t startup) {
replaying = true;

bool replay_block_log_needed = should_replay_block_log();

auto blog_head = blog.head();
Expand Down Expand Up @@ -1830,8 +1828,9 @@ struct controller_impl {
// loading from snapshot without a block log so fork_db can't be considered valid
fork_db_reset_root_to_chain_head();
} else if( !except_ptr && !check_shutdown() && !irreversible_mode() && fork_db.head()) {
// applies all blocks up to fork_db head from fork_db
maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{});
// applies all blocks up to fork_db head from fork_db, shouldn't return incomplete, but if it does loop until complete
while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete)
;
auto head = fork_db.head();
ilog( "reversible blocks replayed to ${bn} : ${id}", ("bn", head->block_num())("id", head->id()) );
}
Expand All @@ -1842,8 +1841,6 @@ struct controller_impl {
};
fork_db_.apply<void>(replay_fork_db);

replaying = false;

if( except_ptr ) {
std::rethrow_exception( except_ptr );
}
Expand Down Expand Up @@ -1992,6 +1989,7 @@ struct controller_impl {
ilog( "chain database started with hash: ${hash}", ("hash", calculate_integrity_hash()) );
okay_to_print_integrity_hash_on_stop = true;

fc::scoped_set_value r(replaying, true);
replay( startup ); // replay any irreversible and reversible blocks ahead of current head

if( check_shutdown() ) return;
Expand All @@ -2013,7 +2011,9 @@ struct controller_impl {
// See comment below about pause-at-block for why `|| conf.num_configured_p2p_peers > 0`
if (chain_head_is_root || conf.num_configured_p2p_peers > 0) {
ilog("applying branch from fork database ending with block: ${id}", ("id", pending_head->id()));
maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{});
// applies all blocks up to forkdb head from forkdb, shouldn't return incomplete, but if it does loop until complete
while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete)
;
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else {
Expand Down Expand Up @@ -4301,23 +4301,25 @@ struct controller_impl {
} FC_LOG_AND_RETHROW( )
}

void apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
controller::apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
try {
if( !irreversible_mode() ) {
maybe_apply_blocks( cb, trx_lookup );
} else {
log_irreversible();
transition_to_savanna_if_needed();
return maybe_apply_blocks( cb, trx_lookup );
}

log_irreversible();
transition_to_savanna_if_needed();
return controller::apply_blocks_result::complete;
} FC_LOG_AND_RETHROW( )
}

void maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup )
controller::apply_blocks_result maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup )
{
controller::apply_blocks_result result = controller::apply_blocks_result::complete;
auto do_apply_blocks = [&](auto& fork_db) {
auto new_head = fork_db.head(); // use best head
if (!new_head)
return; // nothing to do, fork_db at root
return;// nothing to do, fork_db at root
auto [new_head_branch, old_head_branch] = fork_db.fetch_branch_from( new_head->id(), chain_head.id() );

bool switch_fork = !old_head_branch.empty();
Expand Down Expand Up @@ -4356,15 +4358,24 @@ struct controller_impl {
}
}

auto start = fc::time_point::now();
for( auto ritr = new_head_branch.rbegin(); ritr != new_head_branch.rend(); ++ritr ) {
auto except = std::exception_ptr{};
const auto& bsp = *ritr;
try {
bool applied = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated
: controller::block_status::complete, trx_lookup );
if (!switch_fork && (!applied || check_shutdown())) {
shutdown();
break;
if (!switch_fork) { // always complete a switch fork
if (!applied || check_shutdown()) {
shutdown();
break; // result should be complete since we are shutting down
}
// Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected
// to call apply_blocks again if this returns incomplete.
if (!replaying && fc::time_point::now() - start > fc::milliseconds(500)) {
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
result = controller::apply_blocks_result::incomplete;
break;
}
}
} catch ( const std::bad_alloc& ) {
throw;
Expand Down Expand Up @@ -4421,6 +4432,8 @@ struct controller_impl {
};

fork_db_.apply<void>(do_apply_blocks);

return result;
}

deque<transaction_metadata_ptr> abort_block() {
Expand Down Expand Up @@ -5183,9 +5196,9 @@ void controller::set_async_aggregation(async_t val) {
my->async_aggregation = val;
}

void controller::apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
controller::apply_blocks_result controller::apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
validate_db_available_size();
my->apply_blocks(cb, trx_lookup);
return my->apply_blocks(cb, trx_lookup);
}


Expand Down
10 changes: 7 additions & 3 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,20 @@ namespace eosio::chain {
void set_async_voting(async_t val);
void set_async_aggregation(async_t val);

/// Apply any blocks that are ready from the fork_db
void apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

struct accepted_block_result {
const bool is_new_best_head = false; // true if new best head
std::optional<block_handle> block; // empty optional if block is unlinkable
};
// thread-safe
accepted_block_result accept_block( const block_id_type& id, const signed_block_ptr& b ) const;

/// Apply any blocks that are ready from the fork_db
enum class apply_blocks_result {
complete, // all ready blocks in forkdb have been applied
incomplete // time limit reached, additional blocks may be available in forkdb to process
};
apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

boost::asio::io_context& get_thread_pool();

const chainbase::database& db()const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace eosio::chain::plugin_interface {
namespace incoming {
namespace methods {
// synchronously push a block/trx to a single provider, block_state_legacy_ptr may be null
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const block_id_type&, const block_handle&), first_provider_policy>;
using block_sync = method_decl<chain_plugin_interface, controller::apply_blocks_result(const signed_block_ptr&, const block_id_type&, const block_handle&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, transaction_metadata::trx_type, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
18 changes: 14 additions & 4 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3776,11 +3776,21 @@ namespace eosio {
if (best_head) {
++c->unique_blocks_rcvd_count;
fc_dlog(logger, "posting incoming_block to app thread, block ${n}", ("n", ptr->block_num()));

auto process_incoming_blocks = [](auto self) -> void {
try {
auto r = my_impl->producer_plug->on_incoming_block();
if (r == controller::apply_blocks_result::incomplete) {
app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, [self]() {
self(self);
});
}
} catch (...) {} // errors on applied blocks logged in controller
};

app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write,
[]() {
try {
my_impl->producer_plug->on_incoming_block();
} catch (...) {} // errors on applied blocks logged in controller
[process_incoming_blocks]() {
process_incoming_blocks(process_incoming_blocks);
});

// ready to process immediately, so signal producer to interrupt start_block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
virtual void plugin_shutdown();
void handle_sighup() override;

bool on_incoming_block();
controller::apply_blocks_result on_incoming_block();

void pause();
void resume();
Expand Down
25 changes: 15 additions & 10 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin

// called on incoming blocks from net_plugin on the main thread. Will notify controller to process any
// blocks ready in the fork database.
bool on_incoming_block() {
controller::apply_blocks_result on_incoming_block() {
auto now = fc::time_point::now();
_time_tracker.add_idle_time(now);

Expand All @@ -905,12 +905,14 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
("num", fhead.block_num())("id", fhead.id()));
}
_time_tracker.add_other_time();
return true; // return true because block was accepted
// return complete as we are producing and don't want to be interrupted right now. Next start_block will
// give an opportunity for this incoming block to be processed.
return controller::apply_blocks_result::complete;
}

// no reason to abort_block if we have nothing ready to process
if (chain.head().id() == chain.fork_db_head().id()) {
return true; // return true as nothing failed
return controller::apply_blocks_result::complete; // nothing to do
}

// start a new speculative block, adds to time tracker which includes this method's time
Expand All @@ -919,21 +921,22 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// abort the pending block
abort_block();

controller::apply_blocks_result result = controller::apply_blocks_result::complete;
try {
chain.apply_blocks(
result = chain.apply_blocks(
[this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); },
[this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); });
} catch (const guard_exception& e) {
chain_plugin::handle_guard_exception(e);
return false;
return controller::apply_blocks_result::complete; // shutting down
} catch (const std::bad_alloc&) {
chain_apis::api_base::handle_bad_alloc();
} catch (boost::interprocess::bad_alloc&) {
chain_apis::api_base::handle_db_exhaustion();
} catch (const fork_database_exception& e) {
fc_elog(_log, "Cannot recover from ${e}. Shutting down.", ("e", e.to_detail_string()));
appbase::app().quit();
return false;
return controller::apply_blocks_result::complete; // shutting down
} catch (const fc::exception& e) {
throw;
} catch (const std::exception& e) {
Expand All @@ -945,7 +948,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
_production_enabled = true;
}

return true;
return result;
}

void restart_speculative_block() {
Expand Down Expand Up @@ -1650,7 +1653,7 @@ void producer_plugin::handle_sighup() {
fc::logger::update(transient_trx_failed_trace_logger_name, _transient_trx_failed_trace_log);
}

bool producer_plugin::on_incoming_block() {
controller::apply_blocks_result producer_plugin::on_incoming_block() {
return my->on_incoming_block();
}

Expand Down Expand Up @@ -1992,8 +1995,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

abort_block();

chain.apply_blocks([this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); },
[this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); });
auto r = chain.apply_blocks([this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); },
[this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); });
if (r != controller::apply_blocks_result::complete)
return start_block_result::failed;

if (chain.should_terminate()) {
app().quit();
Expand Down