Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: replace incremental with full mode in InterHashes stage #2596

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions silkworm/node/stagedsync/execution_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,22 @@ Stage::Result ExecutionPipeline::forward(db::RWTxn& cycle_txn, BlockNum target_b
const auto stage_result = current_stage_->second->forward(cycle_txn);

if (stage_result != Stage::Result::kSuccess) { /* clang-format off */
auto result_description = std::string(magic_enum::enum_name<Stage::Result>(stage_result));
log::Error(get_log_prefix(current_stage_name), {"op", "Forward", "returned", result_description});
SILK_ERROR_M("ExecutionPipeline") << "Forward interrupted due to stage " << current_stage_->first << " failure";
const auto result_description = std::string(magic_enum::enum_name<Stage::Result>(stage_result));
SILK_ERROR_M(get_log_prefix(current_stage_name), {"op", "Forward", "failure", result_description});
return stage_result;
} /* clang-format on */

auto stage_head_number = read_stage_progress(cycle_txn, current_stage_name.data());
if (!stop_at_block && stage_head_number != target_block_num) {
throw std::logic_error("Sync pipeline: stage returned success with an block_num different from target=" +
to_string(target_block_num) + " reached= " + to_string(stage_head_number));
const auto stage_head_number = read_stage_progress(cycle_txn, current_stage_name.data());
if (!stop_at_block && stage_head_number != target_block_num && current_stage_name != kTriggersStageKey) {
SILK_ERROR_M(get_log_prefix(current_stage_name),
{"op", "Forward", "target", to_string(target_block_num), "reached", to_string(stage_head_number)});
throw std::logic_error("stage returned success with an block_num different from target=" +
to_string(target_block_num) + " reached=" + to_string(stage_head_number));
}

auto [_, stage_duration] = stages_stop_watch.lap();
const auto [_, stage_duration] = stages_stop_watch.lap();
if (stage_duration > kStageDurationThresholdForLog) {
log::Info(get_log_prefix(current_stage_name), {"op", "Forward", "done", StopWatch::format(stage_duration)});
SILK_INFO_M(get_log_prefix(current_stage_name), {"op", "Forward", "done", StopWatch::format(stage_duration)});
}
}

Expand Down
2 changes: 1 addition & 1 deletion silkworm/node/stagedsync/stages/stage_headers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ HeadersStage::HeadersStage(
const auto stop_at_block = Environment::get_stop_at_block();
if (stop_at_block.has_value()) {
forced_target_block_ = stop_at_block;
log::Info(log_prefix_) << "env var STOP_AT_BLOCK set, target block=" << forced_target_block_.value();
SILK_DEBUG_M(log_prefix_, {"target=", std::to_string(*forced_target_block_)}) << " env var STOP_AT_BLOCK set";
}
}

Expand Down
64 changes: 28 additions & 36 deletions silkworm/node/stagedsync/stages/stage_interhashes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ Stage::Result InterHashes::forward(RWTxn& txn) {
ret = regenerate_intermediate_hashes(txn, &expected_state_root);
} else {
// Incremental update
ret = increment_intermediate_hashes(txn, previous_progress, hashstate_stage_progress, &expected_state_root);
// TODO(canepat) debug_unwind block 4'000'000 step 1 fails with kWrongStateRoot in incremental mode
// ret = increment_intermediate_hashes(txn, previous_progress, hashstate_stage_progress, &expected_state_root);
SILK_TRACE_M(log_prefix_, {"function", std::string(__FUNCTION__), "algo", "full rather than incremental"});
ret = regenerate_intermediate_hashes(txn, &expected_state_root);
}

if (ret == Stage::Result::kWrongStateRoot) {
Expand All @@ -106,20 +109,16 @@ Stage::Result InterHashes::forward(RWTxn& txn) {
txn.commit_and_renew();

} catch (const StageError& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = static_cast<Stage::Result>(ex.err());
} catch (const mdbx::exception& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = Stage::Result::kDbError;
} catch (const std::exception& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = Stage::Result::kUnexpectedError;
} catch (...) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", "unexpected and undefined"});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", "unexpected and undefined"});
ret = Stage::Result::kUnexpectedError;
}

Expand Down Expand Up @@ -175,7 +174,10 @@ Stage::Result InterHashes::unwind(RWTxn& txn) {
ret = regenerate_intermediate_hashes(txn, &expected_state_root);
} else {
// Incremental update
ret = increment_intermediate_hashes(txn, previous_progress, to, &expected_state_root);
// TODO(canepat) debug_unwind block 4'000'000 step 1 fails with kWrongStateRoot in incremental mode
// ret = increment_intermediate_hashes(txn, previous_progress, to, &expected_state_root);
SILK_TRACE_M(log_prefix_, {"function", std::string(__FUNCTION__), "algo", "full rather than incremental"});
ret = regenerate_intermediate_hashes(txn, &expected_state_root);
}

success_or_throw(ret);
Expand All @@ -184,20 +186,16 @@ Stage::Result InterHashes::unwind(RWTxn& txn) {
txn.commit_and_renew();

} catch (const StageError& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = static_cast<Stage::Result>(ex.err());
} catch (const mdbx::exception& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = Stage::Result::kDbError;
} catch (const std::exception& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = Stage::Result::kUnexpectedError;
} catch (...) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", "unexpected and undefined"});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", "unexpected and undefined"});
ret = Stage::Result::kUnexpectedError;
}

Expand Down Expand Up @@ -460,6 +458,7 @@ Stage::Result InterHashes::regenerate_intermediate_hashes(RWTxn& txn, const evmc
log_lck.unlock();

const evmc::bytes32 computed_root{trie_loader_->calculate_root()};
SILK_TRACE_M(log_prefix_, {"function", std::string(__FUNCTION__), "computed_root", to_hex(computed_root.bytes)});

// Fail if not what expected
if (expected_root != nullptr && computed_root != *expected_root) {
Expand All @@ -475,20 +474,16 @@ Stage::Result InterHashes::regenerate_intermediate_hashes(RWTxn& txn, const evmc
flush_collected_nodes(txn);

} catch (const StageError& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = static_cast<Stage::Result>(ex.err());
} catch (const mdbx::exception& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = Stage::Result::kDbError;
} catch (const std::exception& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = Stage::Result::kUnexpectedError;
} catch (...) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", "unexpected and undefined"});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", "unexpected and undefined"});
ret = Stage::Result::kUnexpectedError;
}

Expand Down Expand Up @@ -524,6 +519,7 @@ Stage::Result InterHashes::increment_intermediate_hashes(RWTxn& txn, BlockNum fr
log_lck.unlock();

const evmc::bytes32 computed_root{trie_loader_->calculate_root()};
SILK_TRACE_M(log_prefix_, {"function", std::string(__FUNCTION__), "computed_root", to_hex(computed_root.bytes)});

// Fail if not what expected
if (expected_root != nullptr && computed_root != *expected_root) {
Expand All @@ -532,28 +528,23 @@ Stage::Result InterHashes::increment_intermediate_hashes(RWTxn& txn, BlockNum fr
account_collector_.reset(); // Will invoke dtor which causes all flushed files (if any) to be deleted
storage_collector_.reset(); // Will invoke dtor which causes all flushed files (if any) to be deleted
log_lck.unlock();
log::Error("Wrong trie root",
{"expected", to_hex(*expected_root, true), "got", to_hex(computed_root, true)});
SILK_ERROR_M("Wrong trie root", {"expected", to_hex(*expected_root, true), "got", to_hex(computed_root, true)});
return Stage::Result::kWrongStateRoot;
}

flush_collected_nodes(txn);

} catch (const StageError& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = static_cast<Stage::Result>(ex.err());
} catch (const mdbx::exception& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = Stage::Result::kDbError;
} catch (const std::exception& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
ret = Stage::Result::kUnexpectedError;
} catch (...) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", "unexpected and undefined"});
SILK_ERROR_M(log_prefix_, {"function", std::string(__FUNCTION__), "exception", "unexpected and undefined"});
ret = Stage::Result::kUnexpectedError;
}

Expand Down Expand Up @@ -623,4 +614,5 @@ std::vector<std::string> InterHashes::get_log_progress() {
}
return ret;
}

} // namespace silkworm::stagedsync
6 changes: 3 additions & 3 deletions silkworm/node/stagedsync/stages/stage_interhashes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ class InterHashes final : public Stage {
trie::PrefixSet collect_storage_changes(db::RWTxn& txn, BlockNum from, BlockNum to,
absl::btree_map<evmc::address, ethash_hash256>& hashed_addresses);

//! \brief Erigon's RegenerateIntermediateHashes
//! \brief Erigon RegenerateIntermediateHashes
//! \remarks might throw WrongRoot
//! \return the state root
Stage::Result regenerate_intermediate_hashes(
db::RWTxn& txn,
const evmc::bytes32* expected_root = nullptr);

//! \brief Erigon's IncrementIntermediateHashes
//! \brief Erigon IncrementIntermediateHashes
//! \remarks might throw
//! \return the state root
Stage::Result increment_intermediate_hashes(
[[maybe_unused]] Stage::Result increment_intermediate_hashes(
db::RWTxn& txn,
BlockNum from,
BlockNum to,
Expand Down
Loading