Skip to content

Commit

Permalink
Update the best block after importing snapshot chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
remagpie authored and foriequal0 committed Nov 25, 2019
1 parent 0ebd568 commit e7977ae
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 37 deletions.
29 changes: 20 additions & 9 deletions core/src/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ impl BlockChain {
}
}

pub fn insert_bootstrap_block(&self, batch: &mut DBTransaction, bytes: &[u8]) {
pub fn insert_floating_header(&self, batch: &mut DBTransaction, header: &HeaderView) {
self.headerchain.insert_floating_header(batch, header);
}

pub fn insert_floating_block(&self, batch: &mut DBTransaction, bytes: &[u8]) {
let block = BlockView::new(bytes);
let header = block.header_view();
let hash = header.hash();
Expand All @@ -122,20 +126,27 @@ impl BlockChain {
return
}

self.insert_floating_header(batch, &header);
self.body_db.insert_body(batch, &block);
}

pub fn force_update_best_block(&self, batch: &mut DBTransaction, hash: &BlockHash) {
ctrace!(BLOCKCHAIN, "Forcefully updating the best block to {}", hash);

assert!(self.is_known(hash));
assert!(self.pending_best_block_hash.read().is_none());
assert!(self.pending_best_proposal_block_hash.read().is_none());

self.headerchain.insert_bootstrap_header(batch, &header);
self.body_db.insert_body(batch, &block);
let block = self.block(hash).expect("Target block is known");
self.headerchain.force_update_best_header(batch, hash);
self.body_db.update_best_block(batch, &BestBlockChanged::CanonChainAppended {
best_block: bytes.to_vec(),
best_block: block.into_inner(),
});

*self.pending_best_block_hash.write() = Some(hash);
batch.put(db::COL_EXTRA, BEST_BLOCK_KEY, &hash);

*self.pending_best_proposal_block_hash.write() = Some(hash);
batch.put(db::COL_EXTRA, BEST_PROPOSAL_BLOCK_KEY, &hash);
batch.put(db::COL_EXTRA, BEST_BLOCK_KEY, hash);
*self.pending_best_block_hash.write() = Some(*hash);
batch.put(db::COL_EXTRA, BEST_PROPOSAL_BLOCK_KEY, hash);
*self.pending_best_proposal_block_hash.write() = Some(*hash);
}

/// Inserts the block into backing cache database.
Expand Down
30 changes: 16 additions & 14 deletions core/src/blockchain/headerchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,19 @@ impl HeaderChain {
}
}

/// Inserts a bootstrap header into backing cache database.
/// Makes the imported header the best header.
/// Expects the header to be valid and already verified.
/// Inserts a floating header into backing cache database.
/// Expects the header to be valid.
/// If the header is already known, does nothing.
// FIXME: Find better return type. Returning `None` at duplication is not natural
pub fn insert_bootstrap_header(&self, batch: &mut DBTransaction, header: &HeaderView) {
pub fn insert_floating_header(&self, batch: &mut DBTransaction, header: &HeaderView) {
let hash = header.hash();

ctrace!(HEADERCHAIN, "Inserting bootstrap block header #{}({}) to the headerchain.", header.number(), hash);
ctrace!(HEADERCHAIN, "Inserting a floating block header #{}({}) to the headerchain.", header.number(), hash);

if self.is_known_header(&hash) {
ctrace!(HEADERCHAIN, "Block header #{}({}) is already known.", header.number(), hash);
return
}

assert!(self.pending_best_header_hash.read().is_none());
assert!(self.pending_best_proposal_block_hash.read().is_none());

let compressed_header = compress(header.rlp().as_raw(), blocks_swapper());
batch.put(db::COL_HEADERS, &hash, &compressed_header);

Expand All @@ -145,18 +140,25 @@ impl HeaderChain {
parent: header.parent_hash(),
});

batch.put(db::COL_EXTRA, BEST_HEADER_KEY, &hash);
*self.pending_best_header_hash.write() = Some(hash);
batch.put(db::COL_EXTRA, BEST_PROPOSAL_HEADER_KEY, &hash);
*self.pending_best_proposal_block_hash.write() = Some(hash);

let mut pending_hashes = self.pending_hashes.write();
let mut pending_details = self.pending_details.write();

batch.extend_with_cache(db::COL_EXTRA, &mut *pending_details, new_details, CacheUpdatePolicy::Overwrite);
batch.extend_with_cache(db::COL_EXTRA, &mut *pending_hashes, new_hashes, CacheUpdatePolicy::Overwrite);
}

pub fn force_update_best_header(&self, batch: &mut DBTransaction, hash: &BlockHash) {
ctrace!(HEADERCHAIN, "Forcefully updating the best header to {}", hash);
assert!(self.is_known_header(hash));
assert!(self.pending_best_header_hash.read().is_none());
assert!(self.pending_best_proposal_block_hash.read().is_none());

batch.put(db::COL_EXTRA, BEST_HEADER_KEY, hash);
*self.pending_best_header_hash.write() = Some(*hash);
batch.put(db::COL_EXTRA, BEST_PROPOSAL_HEADER_KEY, hash);
*self.pending_best_proposal_block_hash.write() = Some(*hash);
}

/// Inserts the header into backing cache database.
/// Expects the header to be valid and already verified.
/// If the header is already known, does nothing.
Expand Down
18 changes: 16 additions & 2 deletions core/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use cstate::{
ActionHandler, AssetScheme, FindActionHandler, OwnedAsset, StateDB, StateResult, Text, TopLevelState, TopStateView,
};
use ctimer::{TimeoutHandler, TimerApi, TimerScheduleError, TimerToken};
use ctypes::header::Header;
use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction};
use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash};
use cvm::{decode, execute, ChainTimeInfo, ScriptResult, VMConfig};
Expand Down Expand Up @@ -664,15 +665,28 @@ impl ImportBlock for Client {
Ok(self.importer.header_queue.import(unverified)?)
}

fn import_bootstrap_block(&self, block: &Block) -> Result<BlockHash, BlockImportError> {
fn import_trusted_header(&self, header: &Header) -> Result<BlockHash, BlockImportError> {
if self.block_chain().is_known_header(&header.hash()) {
return Err(BlockImportError::Import(ImportError::AlreadyInChain))
}
let import_lock = self.importer.import_lock.lock();
self.importer.import_trusted_header(header, self, &import_lock);
Ok(header.hash())
}

fn import_trusted_block(&self, block: &Block) -> Result<BlockHash, BlockImportError> {
if self.block_chain().is_known(&block.header.hash()) {
return Err(BlockImportError::Import(ImportError::AlreadyInChain))
}
let import_lock = self.importer.import_lock.lock();
self.importer.import_bootstrap_block(block, self, &import_lock);
self.importer.import_trusted_block(block, self, &import_lock);
Ok(block.header.hash())
}

fn force_update_best_block(&self, hash: &BlockHash) {
self.importer.force_update_best_block(hash, self)
}

fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult {
let h = block.header().hash();
let start = Instant::now();
Expand Down
38 changes: 32 additions & 6 deletions core/src/client/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,26 +371,52 @@ impl Importer {
imported.len()
}

pub fn import_bootstrap_block<'a>(&'a self, block: &'a Block, client: &Client, _importer_lock: &MutexGuard<()>) {
pub fn import_trusted_header<'a>(&'a self, header: &'a Header, client: &Client, _importer_lock: &MutexGuard<()>) {
let hash = header.hash();
ctrace!(CLIENT, "Importing trusted header #{}-{:?}", header.number(), hash);

{
let chain = client.block_chain();
let mut batch = DBTransaction::new();
chain.insert_floating_header(&mut batch, &HeaderView::new(&header.rlp_bytes()));
client.db().write_buffered(batch);
chain.commit();
}
client.new_headers(&[hash], &[], &[], &[], &[], 0, None);

client.db().flush().expect("DB flush failed.");
}

pub fn import_trusted_block<'a>(&'a self, block: &'a Block, client: &Client, importer_lock: &MutexGuard<()>) {
let header = &block.header;
let hash = header.hash();
ctrace!(CLIENT, "Importing bootstrap block #{}-{:?}", header.number(), hash);
ctrace!(CLIENT, "Importing trusted block #{}-{:?}", header.number(), hash);

self.import_trusted_header(header, client, importer_lock);
let start = Instant::now();
{
let chain = client.block_chain();
let mut batch = DBTransaction::new();
chain.insert_bootstrap_block(&mut batch, &block.rlp_bytes(&Seal::With));
chain.insert_floating_block(&mut batch, &block.rlp_bytes(&Seal::With));
client.db().write_buffered(batch);
chain.commit();
}
let duration = {
let elapsed = start.elapsed();
elapsed.as_secs() * 1_000_000_000 + u64::from(elapsed.subsec_nanos())
};
client.new_headers(&[hash], &[], &[hash], &[], &[], 0, Some(hash));
self.miner.chain_new_blocks(client, &[hash], &[], &[hash], &[]);
client.new_blocks(&[hash], &[], &[hash], &[], &[], duration);
self.miner.chain_new_blocks(client, &[hash], &[], &[], &[]);
client.new_blocks(&[hash], &[], &[], &[], &[], duration);

client.db().flush().expect("DB flush failed.");
}

pub fn force_update_best_block(&self, hash: &BlockHash, client: &Client) {
let chain = client.block_chain();
let mut batch = DBTransaction::new();
chain.force_update_best_block(&mut batch, hash);
client.db().write_buffered(batch);
chain.commit();

client.db().flush().expect("DB flush failed.");
}
Expand Down
14 changes: 11 additions & 3 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use ckey::{Address, NetworkId, PlatformAddress, Public};
use cmerkle::Result as TrieResult;
use cnetwork::NodeId;
use cstate::{AssetScheme, FindActionHandler, OwnedAsset, StateResult, Text, TopLevelState, TopStateView};
use ctypes::header::Header;
use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction};
use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash};
use cvm::ChainTimeInfo;
Expand Down Expand Up @@ -201,9 +202,16 @@ pub trait ImportBlock {
/// Import a header into the blockchain
fn import_header(&self, bytes: Bytes) -> Result<BlockHash, BlockImportError>;

/// Import a trusted bootstrap header into the blockchain
/// Bootstrap headers don't execute any verifications
fn import_bootstrap_block(&self, bytes: &Block) -> Result<BlockHash, BlockImportError>;
/// Import a trusted header into the blockchain
/// Trusted header doesn't go through any verifications and doesn't update the best header
fn import_trusted_header(&self, header: &Header) -> Result<BlockHash, BlockImportError>;

/// Import a trusted block into the blockchain
/// Trusted block doesn't go through any verifications and doesn't update the best block
fn import_trusted_block(&self, block: &Block) -> Result<BlockHash, BlockImportError>;

/// Forcefully update the best block
fn force_update_best_block(&self, hash: &BlockHash);

/// Import sealed block. Skips all verifications.
fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult;
Expand Down
11 changes: 10 additions & 1 deletion core/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use cnetwork::NodeId;
use cstate::tests::helpers::empty_top_state;
use cstate::{FindActionHandler, StateDB, TopLevelState};
use ctimer::{TimeoutHandler, TimerToken};
use ctypes::header::Header;
use ctypes::transaction::{Action, Transaction};
use ctypes::{BlockHash, BlockNumber, CommonParams, Header as BlockHeader, Tracker, TxHash};
use cvm::ChainTimeInfo;
Expand Down Expand Up @@ -510,7 +511,15 @@ impl ImportBlock for TestBlockChainClient {
unimplemented!()
}

fn import_bootstrap_block(&self, _header: &Block) -> Result<BlockHash, BlockImportError> {
fn import_trusted_header(&self, _header: &Header) -> Result<BlockHash, BlockImportError> {
unimplemented!()
}

fn import_trusted_block(&self, _block: &Block) -> Result<BlockHash, BlockImportError> {
unimplemented!()
}

fn force_update_best_block(&self, _hash: &BlockHash) {
unimplemented!()
}

Expand Down
24 changes: 22 additions & 2 deletions sync/src/block/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ impl NetworkExtension<Event> for Extension {
self.send_chunk_request(&block, &root);
} else {
cdebug!(SYNC, "Transitioning state to {:?}", State::Full);
self.client.force_update_best_block(&block);
self.state = State::Full;
}
}
Expand Down Expand Up @@ -833,6 +834,24 @@ impl Extension {
match self.state {
State::SnapshotHeader(hash, _) => match headers {
[parent, header] if header.hash() == hash => {
match self.client.import_trusted_header(parent) {
Ok(_)
| Err(BlockImportError::Import(ImportError::AlreadyInChain))
| Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {}
Err(err) => {
cwarn!(SYNC, "Cannot import header({}): {:?}", parent.hash(), err);
return
}
}
match self.client.import_trusted_header(header) {
Ok(_)
| Err(BlockImportError::Import(ImportError::AlreadyInChain))
| Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {}
Err(err) => {
cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err);
return
}
}
self.state = State::SnapshotBody {
header: EncodedHeader::new(header.rlp_bytes().to_vec()),
prev_root: *parent.transactions_root(),
Expand Down Expand Up @@ -909,7 +928,7 @@ impl Extension {
header: header.decode(),
transactions: body.clone(),
};
match self.client.import_bootstrap_block(&block) {
match self.client.import_trusted_block(&block) {
Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {
self.state = State::SnapshotChunk {
block: header.hash(),
Expand All @@ -920,7 +939,7 @@ impl Extension {
Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {}
// FIXME: handle import errors
Err(err) => {
cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err);
cwarn!(SYNC, "Cannot import block({}): {:?}", header.hash(), err);
}
}
}
Expand Down Expand Up @@ -1020,6 +1039,7 @@ impl Extension {
self.send_chunk_request(&block, &root);
} else {
cdebug!(SYNC, "Transitioning state to {:?}", State::Full);
self.client.force_update_best_block(&block);
self.state = State::Full;
}
}
Expand Down

0 comments on commit e7977ae

Please sign in to comment.