Skip to content
This repository has been archived by the owner on Nov 18, 2021. It is now read-only.

Commit

Permalink
Add more sensible handling of block coordinator panic (#2407)
Browse files Browse the repository at this point in the history
  • Loading branch information
ejfitzgerald authored Feb 7, 2020
1 parent 02c2621 commit 4d0282c
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 9 deletions.
1 change: 1 addition & 0 deletions libs/ledger/include/ledger/chain/block_coordinator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ class BlockCoordinator
telemetry::CounterPtr consensus_update_failure_total_;
telemetry::CounterPtr remove_block_total_;
telemetry::CounterPtr panic_block_total_;
telemetry::CounterPtr panic_search_total_;
telemetry::HistogramPtr tx_sync_times_;
telemetry::GaugePtr<uint64_t> current_block_num_;
telemetry::GaugePtr<uint64_t> next_block_num_;
Expand Down
76 changes: 67 additions & 9 deletions libs/ledger/src/chain/block_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const std::chrono::seconds WAIT_BEFORE_ASKING_FOR_MISSING_TX_INTERVAL{5};
const std::size_t MIN_BLOCK_SYNC_SLIPPAGE_FOR_WAITLESS_SYNC_OF_MISSING_TXS{30};
const std::chrono::seconds WAIT_FOR_TX_TIMEOUT_INTERVAL{240};
const uint32_t THRESHOLD_FOR_FAST_SYNCING{100u};
const std::size_t MAX_ATTEMPTED_PANIC_REVERTS{10};

} // namespace

Expand Down Expand Up @@ -173,6 +174,9 @@ BlockCoordinator::BlockCoordinator(MainChain &chain, DAGPtr dag,
, panic_block_total_{telemetry::Registry::Instance().CreateCounter(
"ledger_block_coordinator_panic_block_total",
"The total number of times that the block coordinator has paniced")}
, panic_search_total_{telemetry::Registry::Instance().CreateCounter(
"ledger_block_coordinator_panic_search_total",
"The total number of times that the main chain has been searched for a block")}
, tx_sync_times_{telemetry::Registry::Instance().CreateHistogram(
{0.001, 0.01, 0.1, 1, 10, 100}, "ledger_block_coordinator_tx_sync_times",
"The histogram of the time it takes to sync transactions")}
Expand Down Expand Up @@ -868,21 +872,75 @@ void BlockCoordinator::Panic()
{
FETCH_LOG_ERROR(LOGGING_NAME, "In Panic -> Reverting to Genesis");

auto const genesis_digest = chain::GetGenesisDigest();
auto const genesis_merkle = chain::GetGenesisMerkleRoot();
bool revert_successful{false};
std::size_t revert_attempts{0};
MainChain::BlockPtr block{};

if (!storage_unit_.RevertToHash(genesis_merkle, 0))
// walk down the chain trying to find the closest hash merkle hash that we can revert to
while (revert_attempts < MAX_ATTEMPTED_PANIC_REVERTS)
{
FETCH_LOG_CRITICAL(LOGGING_NAME, "Unable to revert back to Genesis!!!");
// on the first loop, or a failure to lookup the previous block then restart from the head of
// the chain again
if (!block)
{
// lookup the heaviest block
block = chain_.GetHeaviestBlock();

// edge case - something is very broken with the main chain
if (!block)
{
break;
}
}

// break from the search loop as soon as we have found a known check point that we can revert
// back to
if (storage_unit_.HashExists(block->merkle_hash, block->block_number))
{
// attempt the revert - we expect this to work almost 100% of the time, however, in rare
// cases it could be the case that the underlying main chain service has made some updates to
// the chain and in which case this might not be present.
//
// In this failure mode we simply continue our search back down the chain. If there are
// greater than MAX_ATTEMPTED_PANIC_REVERTS then, something serious has occurred. In order to
// proceed revert back to the genesis state
if (RevertToBlock(*block))
{
revert_successful = true;
break;
}

// failed to revert the
++revert_attempts;
}

// continue to walk down the chain - this might fail in the case of a large amount of forking,
// however, this will be caught at the start of the next loop
block = chain_.GetBlock(block->previous_hash);

panic_search_total_->increment();
}

if (dag_ && !dag_->RevertToEpoch(0))
// in the worst case when this process completely fails then revert back to the catch all of
// reverting back to genesis
if (!revert_successful)
{
FETCH_LOG_CRITICAL(LOGGING_NAME, "Unable to revert DAG back to genesis!");
}
auto const genesis_digest = chain::GetGenesisDigest();
auto const genesis_merkle = chain::GetGenesisMerkleRoot();

last_executed_block_.ApplyVoid([&](auto &digest) { digest = genesis_digest; });
execution_manager_.SetLastProcessedBlock(genesis_digest);
if (!storage_unit_.RevertToHash(genesis_merkle, 0))
{
FETCH_LOG_CRITICAL(LOGGING_NAME, "Unable to revert back to Genesis!!!");
}

if (dag_ && !dag_->RevertToEpoch(0))
{
FETCH_LOG_CRITICAL(LOGGING_NAME, "Unable to revert DAG back to genesis!");
}

last_executed_block_.ApplyVoid([&](auto &digest) { digest = genesis_digest; });
execution_manager_.SetLastProcessedBlock(genesis_digest);
}

// delay the state machine in these error cases, to allow the network to catch up if the issue
// is network related and if nothing else restrict logs being spammed
Expand Down
211 changes: 211 additions & 0 deletions libs/ledger/tests/chain/block_coordinator_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,217 @@ TEST_F(BlockCoordinatorTests, CheckBlockMining)
}
}

TEST_F(BlockCoordinatorTests, CheckPanicMode)
{
auto genesis = block_generator_();
auto b1 = block_generator_(genesis);
auto b2_1 = block_generator_(b1); // fork 1
auto b2_2 = block_generator_(b1); // fork 2

// define the call expectations
{
InSequence s;

// syncing
EXPECT_CALL(*storage_unit_, LastCommitHash());
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*execution_manager_, LastProcessedBlock());

// pre block validation
// none

// schedule of the genesis block
EXPECT_CALL(*execution_manager_, Execute(IsBlock(genesis)));

// wait for the execution to complete
EXPECT_CALL(*execution_manager_, GetState());
EXPECT_CALL(*execution_manager_, GetState());

// post block validation
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*storage_unit_, Commit(0));

// syncing
EXPECT_CALL(*storage_unit_, LastCommitHash());
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*execution_manager_, LastProcessedBlock());

// --- Event: B1 & B2_1 added ---

// syncing - B1
EXPECT_CALL(*storage_unit_, LastCommitHash());
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*execution_manager_, LastProcessedBlock());
EXPECT_CALL(*storage_unit_, HashExists(_, 0));
EXPECT_CALL(*storage_unit_, RevertToHash(_, 0));
EXPECT_CALL(*execution_manager_, SetLastProcessedBlock(genesis->hash));

// schedule of the next block
EXPECT_CALL(*execution_manager_, Execute(IsBlock(b1)));

// wait for the execution to complete
EXPECT_CALL(*execution_manager_, GetState());
EXPECT_CALL(*execution_manager_, GetState());

// post block validation
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*storage_unit_, Commit(1));

// syncing - B2_1
EXPECT_CALL(*storage_unit_, LastCommitHash());
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*execution_manager_, LastProcessedBlock());
EXPECT_CALL(*storage_unit_, HashExists(_, 1));
EXPECT_CALL(*storage_unit_, RevertToHash(_, 1));
EXPECT_CALL(*execution_manager_, SetLastProcessedBlock(b1->hash));

// schedule of the next block
EXPECT_CALL(*execution_manager_, Execute(IsBlock(b2_1)));

// wait for the execution to complete
EXPECT_CALL(*execution_manager_, GetState());
EXPECT_CALL(*execution_manager_, GetState());

// post block validation
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*storage_unit_, Commit(2));

// syncing - moving to sync'ed state
EXPECT_CALL(*storage_unit_, LastCommitHash());
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*execution_manager_, LastProcessedBlock());

// --- Event: B2_1 removed & B2_2 added, causing a switch to another fork ---

// panic - the block we thought we were processing was not actually correct
EXPECT_CALL(*storage_unit_, LastCommitHash());
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*execution_manager_, LastProcessedBlock());
EXPECT_CALL(*storage_unit_, HashExists(_, 2)); // check to see the fork that doesn't exist
EXPECT_CALL(*storage_unit_, HashExists(_, 1)); // common block
EXPECT_CALL(*storage_unit_, HashExists(_, 1)); // checked again as part of revert to block
EXPECT_CALL(*storage_unit_, RevertToHash(_, 1));
EXPECT_CALL(*execution_manager_, SetLastProcessedBlock(b1->hash));

// syncing - B2_2
EXPECT_CALL(*storage_unit_, LastCommitHash());
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*execution_manager_, LastProcessedBlock());
EXPECT_CALL(*storage_unit_, HashExists(_, 1));
EXPECT_CALL(*storage_unit_, RevertToHash(_, 1));
EXPECT_CALL(*execution_manager_, SetLastProcessedBlock(b1->hash));

// schedule of the next block
EXPECT_CALL(*execution_manager_, Execute(IsBlock(b2_2)));

// wait for the execution to complete
EXPECT_CALL(*execution_manager_, GetState());
EXPECT_CALL(*execution_manager_, GetState());

// post block validation
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*storage_unit_, Commit(2));

// syncing - moving to sync'ed state
EXPECT_CALL(*storage_unit_, LastCommitHash());
EXPECT_CALL(*storage_unit_, CurrentHash());
EXPECT_CALL(*execution_manager_, LastProcessedBlock());
}

// processing of genesis block
ASSERT_EQ(execution_manager_->fake.LastProcessedBlock(), fetch::chain::ZERO_HASH);

Tick(State::RELOAD_STATE, State::RESET);
Tick(State::RESET, State::SYNCHRONISING);
Tick(State::SYNCHRONISING, State::PRE_EXEC_BLOCK_VALIDATION);
Tick(State::PRE_EXEC_BLOCK_VALIDATION, State::WAIT_FOR_TRANSACTIONS);
Tick(State::WAIT_FOR_TRANSACTIONS, State::SYNERGETIC_EXECUTION);
Tick(State::SYNERGETIC_EXECUTION, State::SCHEDULE_BLOCK_EXECUTION);
Tick(State::SCHEDULE_BLOCK_EXECUTION, State::WAIT_FOR_EXECUTION);
Tick(State::WAIT_FOR_EXECUTION, State::WAIT_FOR_EXECUTION);
Tick(State::WAIT_FOR_EXECUTION, State::POST_EXEC_BLOCK_VALIDATION);

ASSERT_EQ(execution_manager_->fake.LastProcessedBlock(), genesis->hash);

Tick(State::POST_EXEC_BLOCK_VALIDATION, State::RESET);
Tick(State::RESET, State::SYNCHRONISING);
Tick(State::SYNCHRONISING, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);

// add the first fork
ASSERT_EQ(BlockStatus::ADDED, main_chain_->AddBlock(*b1));
ASSERT_EQ(BlockStatus::ADDED, main_chain_->AddBlock(*b2_1));

// execute b1
Tick(State::SYNCHRONISED, State::RESET);
Tick(State::RESET, State::SYNCHRONISING);
Tick(State::SYNCHRONISING, State::PRE_EXEC_BLOCK_VALIDATION);
Tick(State::PRE_EXEC_BLOCK_VALIDATION, State::WAIT_FOR_TRANSACTIONS);
Tick(State::WAIT_FOR_TRANSACTIONS, State::SYNERGETIC_EXECUTION);
Tick(State::SYNERGETIC_EXECUTION, State::SCHEDULE_BLOCK_EXECUTION);
Tick(State::SCHEDULE_BLOCK_EXECUTION, State::WAIT_FOR_EXECUTION);
Tick(State::WAIT_FOR_EXECUTION, State::WAIT_FOR_EXECUTION);
Tick(State::WAIT_FOR_EXECUTION, State::POST_EXEC_BLOCK_VALIDATION);

ASSERT_EQ(execution_manager_->fake.LastProcessedBlock(), b1->hash);

Tick(State::POST_EXEC_BLOCK_VALIDATION, State::RESET);
Tick(State::RESET, State::SYNCHRONISING);

// execute b1
Tick(State::SYNCHRONISING, State::PRE_EXEC_BLOCK_VALIDATION);
Tick(State::PRE_EXEC_BLOCK_VALIDATION, State::WAIT_FOR_TRANSACTIONS);
Tick(State::WAIT_FOR_TRANSACTIONS, State::SYNERGETIC_EXECUTION);
Tick(State::SYNERGETIC_EXECUTION, State::SCHEDULE_BLOCK_EXECUTION);
Tick(State::SCHEDULE_BLOCK_EXECUTION, State::WAIT_FOR_EXECUTION);
Tick(State::WAIT_FOR_EXECUTION, State::WAIT_FOR_EXECUTION);
Tick(State::WAIT_FOR_EXECUTION, State::POST_EXEC_BLOCK_VALIDATION);

ASSERT_EQ(execution_manager_->fake.LastProcessedBlock(), b2_1->hash);

Tick(State::POST_EXEC_BLOCK_VALIDATION, State::RESET);

// finishing execution of the chain
Tick(State::RESET, State::SYNCHRONISING);
Tick(State::SYNCHRONISING, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);

// emulate a fork operation
ASSERT_TRUE(main_chain_->RemoveBlock(b2_1->hash));
ASSERT_EQ(BlockStatus::ADDED, main_chain_->AddBlock(*b2_2));

Tick(State::SYNCHRONISED, State::RESET);
Tick(State::RESET, State::SYNCHRONISING);

// panic! the block has been removed from under our feet
Tick(State::SYNCHRONISING, State::RESET);
Tick(State::RESET, State::SYNCHRONISING);

// execute B2_2
Tick(State::SYNCHRONISING, State::PRE_EXEC_BLOCK_VALIDATION);
Tick(State::PRE_EXEC_BLOCK_VALIDATION, State::WAIT_FOR_TRANSACTIONS);
Tick(State::WAIT_FOR_TRANSACTIONS, State::SYNERGETIC_EXECUTION);
Tick(State::SYNERGETIC_EXECUTION, State::SCHEDULE_BLOCK_EXECUTION);
Tick(State::SCHEDULE_BLOCK_EXECUTION, State::WAIT_FOR_EXECUTION);
Tick(State::WAIT_FOR_EXECUTION, State::WAIT_FOR_EXECUTION);
Tick(State::WAIT_FOR_EXECUTION, State::POST_EXEC_BLOCK_VALIDATION);

ASSERT_EQ(execution_manager_->fake.LastProcessedBlock(), b2_2->hash);

Tick(State::POST_EXEC_BLOCK_VALIDATION, State::RESET);

// finish syncing
Tick(State::RESET, State::SYNCHRONISING);
Tick(State::SYNCHRONISING, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);
Tick(State::SYNCHRONISED, State::SYNCHRONISED);
}

class NiceMockBlockCoordinatorTests : public BlockCoordinatorTests
{
protected:
Expand Down

0 comments on commit 4d0282c

Please sign in to comment.