Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IF: Don't drop late blocks #2274

Merged
merged 13 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/chain/block_header_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ block_header_state block_header_state::next(block_header_state_input& input) con
block_header_state block_header_state::next(const signed_block_header& h, validator_t& validator) const {
auto producer = detail::get_scheduled_producer(active_proposer_policy->proposer_schedule.producers, h.timestamp).producer_name;

EOS_ASSERT( h.previous == block_id, unlinkable_block_exception, "previous mismatch" );
EOS_ASSERT( h.previous == block_id, unlinkable_block_exception, "previous mismatch ${p} != ${id}", ("p", h.previous)("id", block_id) );
EOS_ASSERT( h.producer == producer, wrong_producer, "wrong producer specified" );
EOS_ASSERT( h.confirmed == 0, block_validate_exception, "invalid confirmed ${c}", ("c", h.confirmed) );
EOS_ASSERT( !h.new_producers, producer_schedule_exception, "Block header contains legacy producer schedule outdated by activation of WTMsig Block Signatures" );
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/block_header_state_legacy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ namespace eosio::chain {
)&&
{
EOS_ASSERT( h.timestamp == timestamp, block_validate_exception, "timestamp mismatch" );
EOS_ASSERT( h.previous == previous, unlinkable_block_exception, "previous mismatch" );
EOS_ASSERT( h.previous == previous, unlinkable_block_exception, "previous mismatch ${p} != ${id}", ("p", h.previous)("id", previous) );
EOS_ASSERT( h.confirmed == confirmed, block_validate_exception, "confirmed mismatch" );
EOS_ASSERT( h.producer == producer, wrong_producer, "wrong producer specified" );
EOS_ASSERT( h.schedule_version == active_schedule_version, producer_schedule_exception, "schedule_version in signed block is corrupted" );
Expand Down
97 changes: 64 additions & 33 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ struct controller_impl {
block_log blog;
std::optional<pending_state> pending;
fork_database fork_db;
std::atomic<uint32_t> if_irreversible_block_num{0};
block_id_type if_irreversible_block_id;
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
resource_limits_manager resource_limits;
subjective_billing subjective_bill;
authorization_manager authorization;
Expand Down Expand Up @@ -1010,6 +1010,12 @@ struct controller_impl {
});
}

bool fork_db_block_exists( const block_id_type& id ) const {
return fork_db.apply<bool>([&](const auto& forkdb) {
return forkdb.block_exists(id);
});
}

signed_block_ptr fork_db_fetch_block_by_id( const block_id_type& id ) const {
return fork_db.apply<signed_block_ptr>([&](const auto& forkdb) {
auto bsp = forkdb.get_block(id);
Expand Down Expand Up @@ -1212,23 +1218,28 @@ struct controller_impl {
("lib_num", lib_num)("bn", fork_db_root_block_num()) );
}

const uint32_t if_lib = if_irreversible_block_num;
uint32_t if_lib = block_header::num_from_id(if_irreversible_block_id);
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
const uint32_t new_lib = if_lib > 0 ? if_lib : fork_db_head_irreversible_blocknum();

if( new_lib <= lib_num )
return;

auto mark_branch_irreversible = [&, this](auto& forkdb) {
auto branch = forkdb.fetch_branch( fork_db_head(forkdb, irreversible_mode())->id(), new_lib );
auto branch = (if_lib > 0) ? forkdb.fetch_branch( if_irreversible_block_id, new_lib)
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
: forkdb.fetch_branch( fork_db_head(forkdb, irreversible_mode())->id(), new_lib );
try {
auto should_process = [&](auto& bsp) {
return read_mode == db_read_mode::IRREVERSIBLE || bsp->is_valid();
};
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved

std::vector<std::future<std::vector<char>>> v;
v.reserve( branch.size() );
for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
for( auto bitr = branch.rbegin(); bitr != branch.rend() && should_process(*bitr); ++bitr ) {
v.emplace_back( post_async_task( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) );
}
auto it = v.begin();

for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
for( auto bitr = branch.rbegin(); bitr != branch.rend() && should_process(*bitr); ++bitr ) {
if( read_mode == db_read_mode::IRREVERSIBLE ) {
controller::block_report br;
apply_block( br, *bitr, controller::block_status::complete, trx_meta_cache_lookup{} );
Expand Down Expand Up @@ -2810,7 +2821,8 @@ struct controller_impl {
// claim has already been verified
auto claimed = forkdb.search_on_branch(bsp->id(), if_ext.qc_claim.block_num);
if (claimed) {
set_if_irreversible_block_num(claimed->core.final_on_strong_qc_block_num);
auto& final_on_strong_qc_block_ref = claimed->core.get_block_reference(claimed->core.final_on_strong_qc_block_num);
set_if_irreversible_block_id(final_on_strong_qc_block_ref.block_id);
}
}
});
Expand All @@ -2827,7 +2839,7 @@ struct controller_impl {
const auto& if_extension = std::get<instant_finality_extension>(*ext);
if (if_extension.new_finalizer_policy) {
ilog("Transition to instant finality happening after block ${b}", ("b", forkdb.chain_head->block_num()));
if_irreversible_block_num = forkdb.chain_head->block_num();
set_if_irreversible_block_id(forkdb.chain_head->id());

// cancel any proposed schedule changes, prepare for new ones under instant_finality
const auto& gpo = db.get<global_property_object>();
Expand Down Expand Up @@ -3160,7 +3172,7 @@ struct controller_impl {
});
}

// expected to be called from application thread as it modifies bsp->valid_qc,
// expected to be called from application thread as it modifies bsp->valid_qc and if_irreversible_block_id
void integrate_received_qc_to_block(const block_state_ptr& bsp_in) {
// extract QC from block extension
const auto& block_exts = bsp_in->block->validate_and_extract_extensions();
Expand All @@ -3172,18 +3184,18 @@ struct controller_impl {
const auto& qc_ext = std::get<quorum_certificate_extension>(block_exts.lower_bound(qc_ext_id)->second);
const auto& received_qc = qc_ext.qc.qc;

const auto bsp = fetch_bsp_on_branch_by_num( bsp_in->previous(), qc_ext.qc.block_num );
if( !bsp ) {
const auto claimed = fetch_bsp_on_branch_by_num( bsp_in->previous(), qc_ext.qc.block_num );
if( !claimed ) {
return;
}

// Don't save the QC from block extension if the claimed block has a better valid_qc.
if (bsp->valid_qc && (bsp->valid_qc->is_strong() || received_qc.is_weak())) {
if (claimed->valid_qc && (claimed->valid_qc->is_strong() || received_qc.is_weak())) {
return;
}

// Save the QC. This is safe as the function is called by push_block from application thread.
bsp->valid_qc = received_qc;
claimed->valid_qc = received_qc;

// advance LIB if QC is strong
if( received_qc.is_strong() ) {
Expand All @@ -3192,7 +3204,8 @@ struct controller_impl {
// will not be valid or forked out. This is safe because the block is
// just acting as a carrier of this info. It doesn't matter if the block
// is actually valid as it simply is used as a network message for this data.
set_if_irreversible_block_num(bsp->core.final_on_strong_qc_block_num);
auto& final_on_strong_qc_block_ref = claimed->core.get_block_reference(claimed->core.final_on_strong_qc_block_num);
set_if_irreversible_block_id(final_on_strong_qc_block_ref.block_id);
}
}

Expand Down Expand Up @@ -3376,7 +3389,7 @@ struct controller_impl {

auto prev = forkdb.get_block_header( b->previous );
EOS_ASSERT( prev, unlinkable_block_exception,
"unlinkable block ${id}", ("id", id)("previous", b->previous) );
"unlinkable block ${id} previous ${p}", ("id", id)("p", b->previous) );

return control->create_block_state_i( id, b, *prev );
} );
Expand Down Expand Up @@ -3404,12 +3417,33 @@ struct controller_impl {
return fork_db.apply<std::optional<block_handle>>(f);
}

template <class BSP>
void accept_block(const BSP& bsp) {
assert(bsp && bsp->block);

// Save the received QC as soon as possible, no matter whether the block itself is valid or not
if constexpr (std::is_same_v<BSP, block_state_ptr>) {
integrate_received_qc_to_block(bsp);
}

auto do_accept_block = [&](auto& forkdb) {
if constexpr (std::is_same_v<BSP, typename std::decay_t<decltype(forkdb.chain_head)>>)
forkdb.add( bsp, mark_valid_t::no, ignore_duplicate_t::no );

emit( accepted_block_header, std::tie(bsp->block, bsp->id()) );
};

fork_db.apply<void>(do_accept_block);
}

template <class BSP>
void push_block( controller::block_report& br,
const BSP& bsp,
const forked_callback_t& forked_branch_cb,
const trx_meta_cache_lookup& trx_lookup )
{
assert(bsp && bsp->block);

// Save the received QC as soon as possible, no matter whether the block itself is valid or not
if constexpr (std::is_same_v<BSP, block_state_ptr>) {
integrate_received_qc_to_block(bsp);
Expand All @@ -3422,7 +3456,6 @@ struct controller_impl {
trusted_producer_light_validation = old_value;
});
try {
EOS_ASSERT( bsp, block_validate_exception, "null block" );
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
const auto& b = bsp->block;

if( conf.terminate_at_block > 0 && conf.terminate_at_block <= head_block_num()) {
Expand Down Expand Up @@ -3892,17 +3925,13 @@ struct controller_impl {
return is_trx_transient ? nullptr : deep_mind_logger;
}

void set_if_irreversible_block_num(uint32_t block_num) {
if( block_num > if_irreversible_block_num ) {
if_irreversible_block_num = block_num;
dlog("irreversible block ${bn}", ("bn", block_num));
void set_if_irreversible_block_id(const block_id_type& id) {
if( block_header::num_from_id(id) > block_header::num_from_id(if_irreversible_block_id) ) {
if_irreversible_block_id = id;
dlog("irreversible block ${bn} : ${id}", ("bn", block_header::num_from_id(id))("id", id));
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
}
}

uint32_t get_if_irreversible_block_num() const {
return if_irreversible_block_num;
}

uint32_t earliest_available_block_num() const {
return (blog.first_block_num() != 0) ? blog.first_block_num() : fork_db_root_block_num();
}
Expand Down Expand Up @@ -4317,12 +4346,16 @@ std::optional<block_handle> controller::create_block_handle( const block_id_type
}

void controller::push_block( block_report& br,
const block_handle& bt,
const block_handle& b,
const forked_callback_t& forked_cb,
const trx_meta_cache_lookup& trx_lookup )
{
validate_db_available_size();
std::visit([&](const auto& bsp) { my->push_block( br, bsp, forked_cb, trx_lookup); }, bt.bsp);
std::visit([&](const auto& bsp) { my->push_block( br, bsp, forked_cb, trx_lookup); }, b.bsp);
}

void controller::accept_block(const block_handle& b) {
std::visit([&](const auto& bsp) { my->accept_block(bsp); }, b.bsp);
}

transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx,
Expand Down Expand Up @@ -4456,14 +4489,12 @@ std::optional<block_id_type> controller::pending_producer_block_id()const {
return my->pending_producer_block_id();
}

void controller::set_if_irreversible_block_num(uint32_t block_num) {
// needs to be set by qc_chain at startup and as irreversible changes
assert(block_num > 0);
my->set_if_irreversible_block_num(block_num);
void controller::set_if_irreversible_block_id(const block_id_type& id) {
my->set_if_irreversible_block_id(id);
}

uint32_t controller::if_irreversible_block_num() const {
return my->get_if_irreversible_block_num();
return block_header::num_from_id(my->if_irreversible_block_id);
}

uint32_t controller::last_irreversible_block_num() const {
Expand Down Expand Up @@ -4494,9 +4525,9 @@ signed_block_ptr controller::fetch_block_by_id( const block_id_type& id )const {
return signed_block_ptr();
}

bool controller::block_exists(const block_id_type&id) const {
signed_block_ptr sb_ptr = my->fork_db_fetch_block_by_id(id);
if( sb_ptr ) return true;
bool controller::block_exists(const block_id_type& id) const {
bool exists = my->fork_db_block_exists(id);
if( exists ) return true;
std::optional<signed_block_header> sbh = my->blog.read_block_header_by_num( block_header::num_from_id(id) );
if( sbh && sbh->calculate_id() == id ) return true;
return false;
Expand Down
14 changes: 13 additions & 1 deletion libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ namespace eosio::chain {

bhsp_t get_block_header_impl( const block_id_type& id ) const;
bsp_t get_block_impl( const block_id_type& id ) const;
bool block_exists_impl( const block_id_type& id ) const;
void reset_root_impl( const bhs_t& root_bhs );
void rollback_head_to_root_impl();
void advance_root_impl( const block_id_type& id );
Expand Down Expand Up @@ -393,7 +394,7 @@ namespace eosio::chain {

auto prev_bh = get_block_header_impl( n->previous() );
EOS_ASSERT( prev_bh, unlinkable_block_exception,
"unlinkable block", ("id", n->id())("previous", n->previous()) );
"forkdb unlinkable block ${id} previous ${p}", ("id", n->id())("p", n->previous()) );

if (validate) {
try {
Expand Down Expand Up @@ -692,6 +693,17 @@ namespace eosio::chain {
return {};
}

template<class BSP>
bool fork_database_t<BSP>::block_exists(const block_id_type& id) const {
std::lock_guard g( my->mtx );
return my->block_exists_impl(id);
}

template<class BSP>
bool fork_database_impl<BSP>::block_exists_impl(const block_id_type& id) const {
return index.find( id ) != index.end();
}

// ------------------ fork_database -------------------------

fork_database::fork_database(const std::filesystem::path& data_dir)
Expand Down
11 changes: 6 additions & 5 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,18 @@ namespace eosio::chain {

/**
* @param br returns statistics for block
* @param bt block to push, created by create_block_handle
* @param b block to push, created by create_block_handle
* @param cb calls cb with forked applied transactions for each forked block
* @param trx_lookup user provided lookup function for externally cached transaction_metadata
*/
void push_block( block_report& br,
const block_handle& bt,
const block_handle& b,
const forked_callback_t& cb,
const trx_meta_cache_lookup& trx_lookup );

/// Accept block into fork_database
void accept_block(const block_handle& b);

boost::asio::io_context& get_thread_pool();

const chainbase::database& db()const;
Expand Down Expand Up @@ -268,9 +271,7 @@ namespace eosio::chain {
// post-instant-finality this always returns nullptr
const producer_authority_schedule* pending_producers_legacy()const;

// Called by qc_chain to indicate the current irreversible block num
// After hotstuff is activated, this should be called on startup by qc_chain
void set_if_irreversible_block_num(uint32_t block_num);
void set_if_irreversible_block_id(const block_id_type& id);
uint32_t if_irreversible_block_num() const;

uint32_t last_irreversible_block_num() const;
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/fork_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace eosio::chain {

bhsp_t get_block_header( const block_id_type& id ) const;
bsp_t get_block( const block_id_type& id ) const;
bool block_exists( const block_id_type& id ) const;

/**
* Purges any existing blocks from the fork database and resets the root block_header_state to the provided value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace eosio::chain::plugin_interface {
namespace incoming {
namespace methods {
// synchronously push a block/trx to a single provider, block_state_legacy_ptr may be null
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const std::optional<block_id_type>&, const std::optional<block_handle>&), first_provider_policy>;
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const block_id_type&, const std::optional<block_handle>&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, transaction_metadata::trx_type, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,8 @@ fc::variant read_only::get_block_info(const read_only::get_block_info_params& pa

void read_write::push_block(read_write::push_block_params&& params, next_function<read_write::push_block_results> next) {
try {
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>( std::move(params) ), std::optional<block_id_type>{}, std::optional<block_handle>{});
auto b = std::make_shared<signed_block>( std::move(params) );
app().get_method<incoming::methods::block_sync>()(b, b->calculate_id(), std::optional<block_handle>{});
} catch ( boost::interprocess::bad_alloc& ) {
handle_db_exhaustion();
} catch ( const std::bad_alloc& ) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3905,7 +3905,7 @@ namespace eosio {
if( reason == unlinkable || reason == no_reason ) {
dispatcher->add_unlinkable_block( std::move(block), blk_id );
}
// reason==no_reason means accept_block() return false because we are producing, don't call rejected_block which sends handshake
// reason==no_reason means accept_block() return false which is a fatal error, don't call rejected_block which sends handshake
if( reason != no_reason ) {
sync_master->rejected_block( c, blk_num, sync_manager::closing_mode::handshake );
}
Expand Down
Loading
Loading