From 8492e3e61d854e360b7a14bd85a53e0c6cb11469 Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 11 Oct 2016 15:53:45 +0200 Subject: [PATCH] Backports to beta v1.3.6 (#2571) * v1.3.6 * Print backtrace on panic (#2535) * Don't activate peers on connect; Test (#2537) * Removing unwarps from sync module (#2551) * Remove unwrap from client module (#2554) * remove unwraps in client * imporve block hash expect message * mining perf trace * Fixed race condition in trace import (#2555) --- Cargo.lock | 26 ++--- Cargo.toml | 2 +- ethcore/src/client/client.rs | 105 +++++++++-------- ethcore/src/client/test_client.rs | 11 +- ethcore/src/miner/miner.rs | 1 + ethcore/src/trace/db.rs | 27 +++-- nsis/installer.nsi | 2 +- parity/main.rs | 2 + sync/src/blocks.rs | 6 +- sync/src/chain.rs | 182 ++++++++++++++++-------------- sync/src/tests/chain.rs | 24 +++- sync/src/tests/helpers.rs | 37 ++++-- util/Cargo.toml | 2 +- 13 files changed, 246 insertions(+), 181 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 935f757970b..3c67ba21f35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ [root] name = "parity" -version = "1.3.5" +version = "1.3.6" dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)", @@ -20,7 +20,7 @@ dependencies = [ "ethcore-logger 1.3.0", "ethcore-rpc 1.3.0", "ethcore-signer 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "ethsync 1.3.0", "fdlimit 0.1.0", "hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -270,7 +270,7 @@ dependencies = [ "ethcore-ipc 1.3.0", "ethcore-ipc-codegen 1.3.0", "ethcore-ipc-nano 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "ethjson 0.1.0", "ethstore 0.1.0", "evmjit 1.3.0", @@ -294,7 +294,7 @@ version = "1.3.0" dependencies = [ "clippy 0.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-rpc 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 6.1.0 (git+https://github.com/ethcore/jsonrpc-http-server.git)", @@ -336,7 +336,7 @@ name = "ethcore-ipc" version = "1.3.0" dependencies = [ "ethcore-devtools 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -381,7 +381,7 @@ dependencies = [ "ethcore-ipc 1.3.0", "ethcore-ipc-codegen 1.3.0", "ethcore-ipc-nano 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -393,7 +393,7 @@ name = "ethcore-logger" version = "1.3.0" dependencies = [ "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -408,7 +408,7 @@ dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-devtools 1.3.0", "ethcore-io 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "igd 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -432,7 +432,7 @@ dependencies = [ "ethcore-devtools 1.3.0", "ethcore-io 1.3.0", "ethcore-ipc 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "ethjson 0.1.0", "ethsync 1.3.0", "json-ipc-server 0.2.4 (git+https://github.com/ethcore/json-ipc-server.git?branch=beta)", @@ -455,7 +455,7 @@ dependencies = [ "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-io 1.3.0", "ethcore-rpc 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "jsonrpc-core 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-dapps-signer 1.4.0 (git+https://github.com/ethcore/parity-ui.git)", @@ -466,7 +466,7 @@ dependencies = [ [[package]] name = "ethcore-util" -version = "1.3.5" +version = "1.3.6" dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", @@ -499,7 +499,7 @@ dependencies = [ name = "ethjson" version = "0.1.0" dependencies = [ - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", "serde_codegen 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -547,7 +547,7 @@ dependencies = [ "ethcore-ipc-codegen 1.3.0", "ethcore-ipc-nano 1.3.0", "ethcore-network 1.3.0", - "ethcore-util 1.3.5", + "ethcore-util 1.3.6", "heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 5934cb06b56..85f8686727b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] description = "Ethcore client." name = "parity" -version = "1.3.5" +version = "1.3.6" license = "GPL-3.0" authors = ["Ethcore "] build = "build.rs" diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 15a93838d2b..6a479b09f7f 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -159,13 +159,6 @@ pub const DB_COL_ACCOUNT_BLOOM: Option = Some(5); /// Number of columns in DB pub const DB_NO_OF_COLUMNS: Option = Some(6); -/// Append a path element to the given path and return the string. -pub fn append_path

(path: P, item: &str) -> String where P: AsRef { - let mut p = path.as_ref().to_path_buf(); - p.push(item); - p.to_str().unwrap().to_owned() -} - impl Client { /// Create a new client with given spec and DB path and custom verifier. pub fn new( @@ -182,7 +175,7 @@ impl Client { db_config.compaction = config.db_compaction.compaction_profile(); db_config.wal = config.db_wal; - let db = Arc::new(try!(Database::open(&db_config, &path.to_str().unwrap()).map_err(ClientError::Database))); + let db = Arc::new(try!(Database::open(&db_config, &path.to_str().expect("DB path could not be converted to string.")).map_err(ClientError::Database))); let chain = Arc::new(BlockChain::new(config.blockchain, &gb, db.clone())); let tracedb = Arc::new(try!(TraceDB::new(config.tracing, db.clone(), chain.clone()))); @@ -295,31 +288,28 @@ impl Client { }; // Check if Parent is in chain - let chain_has_parent = self.chain.block_header(&header.parent_hash); - if let None = chain_has_parent { - warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash); - return Err(()); - }; - - // Enact Verified Block - let parent = chain_has_parent.unwrap(); - let last_hashes = self.build_last_hashes(header.parent_hash().clone()); - let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash); - - let enact_result = enact_verified(block, engine, self.tracedb.tracing_enabled(), db, &parent, last_hashes, &self.vm_factory, self.trie_factory.clone()); - if let Err(e) = enact_result { - warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - return Err(()); - }; + let chain_has_parent = self.chain.block_header(header.parent_hash()); + if let Some(parent) = chain_has_parent { + // Enact Verified Block + let last_hashes = self.build_last_hashes(header.parent_hash().clone()); + let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash()); + + let enact_result = enact_verified(block, engine, self.tracedb.tracing_enabled(), db, &parent, last_hashes, &self.vm_factory, self.trie_factory.clone()); + let locked_block = try!(enact_result.map_err(|e| { + warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + })); + + // Final Verification + if let Err(e) = self.verifier.verify_block_final(header, locked_block.block().header()) { + warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); + return Err(()); + } - // Final Verification - let locked_block = enact_result.unwrap(); - if let Err(e) = self.verifier.verify_block_final(header, locked_block.block().header()) { - warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); - return Err(()); + Ok(locked_block) + } else { + warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash()); + Err(()) } - - Ok(locked_block) } fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec, Vec) { @@ -372,14 +362,17 @@ impl Client { invalid_blocks.insert(header.hash()); continue; } + let is_invalid = invalid_blocks.contains(header.parent_hash()); + if let (false, Ok(closed_block)) = (is_invalid, self.check_and_close_block(&block)) { + imported_blocks.push(header.hash()); - let closed_block = closed_block.unwrap(); - imported_blocks.push(header.hash()); - - let route = self.commit_block(closed_block, &header.hash(), &block.bytes); - import_results.push(route); + let route = self.commit_block(closed_block, &header.hash(), &block.bytes); + import_results.push(route); - self.report.write().accrue_block(&block); + self.report.write().accrue_block(&block); + } else { + invalid_blocks.insert(header.hash()); + } } let imported = imported_blocks.len(); @@ -428,7 +421,7 @@ impl Client { // Are we committing an era? let ancient = if number >= HISTORY { let n = number - HISTORY; - Some((n, self.chain.block_hash(n).unwrap())) + Some((n, self.chain.block_hash(n).expect("only verified blocks can be commited; verified block has hash; qed"))) } else { None }; @@ -835,8 +828,9 @@ impl BlockChainClient for Client { let t = self.chain.block_body(&address.block_hash) .and_then(|block| BodyView::new(&block).localized_transaction_at(&address.block_hash, block_number, address.index)); - match (t, self.chain.transaction_receipt(&address)) { - (Some(tx), Some(receipt)) => { + let tx_and_sender = t.and_then(|tx| tx.sender().ok().map(|sender| (tx, sender))); + match (tx_and_sender, self.chain.transaction_receipt(&address)) { + (Some((tx, sender)), Some(receipt)) => { let block_hash = tx.block_hash.clone(); let block_number = tx.block_number.clone(); let transaction_hash = tx.hash(); @@ -858,7 +852,7 @@ impl BlockChainClient for Client { gas_used: receipt.gas_used - prior_gas_used, contract_address: match tx.action { Action::Call(_) => None, - Action::Create => Some(contract_address(&tx.sender().unwrap(), &tx.nonce)) + Action::Create => Some(contract_address(&sender, &tx.nonce)) }, logs: receipt.logs.into_iter().enumerate().map(|(i, log)| LocalizedLogEntry { entry: log, @@ -981,17 +975,18 @@ impl BlockChainClient for Client { let start = self.block_number(filter.range.start); let end = self.block_number(filter.range.end); - if start.is_some() && end.is_some() { - let filter = trace::Filter { - range: start.unwrap() as usize..end.unwrap() as usize, - from_address: From::from(filter.from_address), - to_address: From::from(filter.to_address), - }; + match (start, end) { + (Some(s), Some(e)) => { + let filter = trace::Filter { + range: s as usize..e as usize, + from_address: From::from(filter.from_address), + to_address: From::from(filter.to_address), + }; - let traces = self.tracedb.filter(&filter); - Some(traces) - } else { - None + let traces = self.tracedb.filter(&filter); + Some(traces) + }, + _ => None, } } @@ -1063,11 +1058,15 @@ impl MiningBlockChainClient for Client { // Add uncles self.chain .find_uncle_headers(&h, engine.maximum_uncle_age()) - .unwrap() + .unwrap_or_else(Vec::new) .into_iter() .take(engine.maximum_uncle_count()) .foreach(|h| { - open_block.push_uncle(h).unwrap(); + open_block.push_uncle(h).expect("pushing maximum_uncle_count; + open_block was just created; + push_uncle is not ok only if more than maximum_uncle_count is pushed; + so all push_uncle are Ok; + qed"); }); open_block diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index d6df9836937..a780037b665 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -53,6 +53,8 @@ pub struct TestBlockChainClient { pub genesis_hash: H256, /// Last block hash. pub last_hash: RwLock, + /// Extra data do set for each block + pub extra_data: Bytes, /// Difficulty. pub difficulty: RwLock, /// Balances. @@ -99,11 +101,17 @@ impl Default for TestBlockChainClient { impl TestBlockChainClient { /// Creates new test client. pub fn new() -> Self { + Self::new_with_extra_data(Bytes::new()) + } + + /// Creates new test client with specified extra data for each block + pub fn new_with_extra_data(extra_data: Bytes) -> Self { let spec = Spec::new_test(); let mut client = TestBlockChainClient { blocks: RwLock::new(HashMap::new()), numbers: RwLock::new(HashMap::new()), genesis_hash: H256::new(), + extra_data: extra_data, last_hash: RwLock::new(H256::new()), difficulty: RwLock::new(From::from(0)), balances: RwLock::new(HashMap::new()), @@ -121,7 +129,7 @@ impl TestBlockChainClient { client.genesis_hash = client.last_hash.read().clone(); client } - + /// Set the transaction receipt result pub fn set_transaction_receipt(&self, id: TransactionID, receipt: LocalizedReceipt) { self.receipts.write().insert(id, receipt); @@ -166,6 +174,7 @@ impl TestBlockChainClient { header.parent_hash = self.last_hash.read().clone(); header.number = n as BlockNumber; header.gas_limit = U256::from(1_000_000); + header.extra_data = self.extra_data.clone(); let uncles = match with { EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { let mut uncles = RlpStream::new_list(1); diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index ec847d482cd..e55d8f709a3 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -249,6 +249,7 @@ impl Miner { fn prepare_sealing(&self, chain: &MiningBlockChainClient) { trace!(target: "miner", "prepare_sealing: entering"); + let _timer = PerfTimer::new("prepare_sealing"); { trace!(target: "miner", "recalibrating..."); let txq = self.transaction_queue.clone(); diff --git a/ethcore/src/trace/db.rs b/ethcore/src/trace/db.rs index cd34e73c2ae..9b2eb17adff 100644 --- a/ethcore/src/trace/db.rs +++ b/ethcore/src/trace/db.rs @@ -268,16 +268,6 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { return; } - // at first, let's insert new block traces - { - let mut traces = self.traces.write(); - // it's important to use overwrite here, - // cause this value might be queried by hash later - batch.write_with_cache(DB_COL_TRACE, &mut *traces, request.block_hash, request.traces, CacheUpdatePolicy::Overwrite); - // note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection - self.note_used(CacheID::Trace(request.block_hash.clone())); - } - // now let's rebuild the blooms { let range_start = request.block_number as Number + 1 - request.enacted.len(); @@ -288,12 +278,25 @@ impl TraceDatabase for TraceDB where T: DatabaseExtras { // all traces are expected to be found here. That's why `expect` has been used // instead of `filter_map`. If some traces haven't been found, it meens that // traces database is corrupted or incomplete. - .map(|block_hash| self.traces(block_hash).expect("Traces database is incomplete.")) - .map(|block_traces| block_traces.bloom()) + .map(|block_hash| if block_hash == &request.block_hash { + request.traces.bloom() + } else { + self.traces(block_hash).expect("Traces database is incomplete.").bloom() + }) .map(blooms::Bloom::from) .map(Into::into) .collect(); + // insert new block traces into the cache and the database + { + let mut traces = self.traces.write(); + // it's important to use overwrite here, + // cause this value might be queried by hash later + batch.write_with_cache(DB_COL_TRACE, &mut *traces, request.block_hash, request.traces, CacheUpdatePolicy::Overwrite); + // note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection + self.note_used(CacheID::Trace(request.block_hash.clone())); + } + let chain = BloomGroupChain::new(self.bloom_config, self); let trace_blooms = chain.replace(&replaced_range, enacted_blooms); let blooms_to_insert = trace_blooms.into_iter() diff --git a/nsis/installer.nsi b/nsis/installer.nsi index 2fa5c001e2d..31f7a8d4a6a 100644 --- a/nsis/installer.nsi +++ b/nsis/installer.nsi @@ -4,7 +4,7 @@ !define DESCRIPTION "Fast, light, robust Ethereum implementation" !define VERSIONMAJOR 1 !define VERSIONMINOR 3 -!define VERSIONBUILD 5 +!define VERSIONBUILD 6 !addplugindir .\ diff --git a/parity/main.rs b/parity/main.rs index bb9f5e743d6..454d2c16dca 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -117,6 +117,8 @@ fn start() -> Result { } fn main() { + // Always print backtrace on panic. + ::std::env::set_var("RUST_BACKTRACE", "1"); // just redirect to the sync::main() if std::env::args().nth(1).map_or(false, |arg| arg == "sync") { sync::main(); diff --git a/sync/src/blocks.rs b/sync/src/blocks.rs index 342b59459c7..74c7ab36ee1 100644 --- a/sync/src/blocks.rs +++ b/sync/src/blocks.rs @@ -183,8 +183,8 @@ impl BlockCollection { { let mut blocks = Vec::new(); let mut head = self.head; - while head.is_some() { - head = self.parents.get(&head.unwrap()).cloned(); + while let Some(h) = head { + head = self.parents.get(&h).cloned(); if let Some(head) = head { match self.blocks.get(&head) { Some(block) if block.body.is_some() => { @@ -200,7 +200,7 @@ impl BlockCollection { for block in blocks.drain(..) { let mut block_rlp = RlpStream::new_list(3); block_rlp.append_raw(&block.header, 1); - let body = Rlp::new(block.body.as_ref().unwrap()); // incomplete blocks are filtered out in the loop above + let body = Rlp::new(block.body.as_ref().expect("blocks contains only full blocks; qed")); block_rlp.append_raw(body.at(0).as_raw(), 1); block_rlp.append_raw(body.at(1).as_raw(), 1); drained.push(block_rlp.out()); diff --git a/sync/src/chain.rs b/sync/src/chain.rs index c8e281964b2..750d4528ed1 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -89,7 +89,6 @@ use util::*; use network::*; -use std::mem::{replace}; use ethcore::views::{HeaderView, BlockView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo, BlockImportError}; @@ -138,7 +137,7 @@ const GET_RECEIPTS_PACKET: u8 = 0x0f; const RECEIPTS_PACKET: u8 = 0x10; const HEADERS_TIMEOUT_SEC: f64 = 15f64; -const BODIES_TIMEOUT_SEC: f64 = 5f64; +const BODIES_TIMEOUT_SEC: f64 = 10f64; const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64; #[derive(Copy, Clone, Eq, PartialEq, Debug)] @@ -230,8 +229,6 @@ struct PeerInfo { network_id: U256, /// Peer best block hash latest_hash: H256, - /// Peer best block number if known - latest_number: Option, /// Peer total difficulty if known difficulty: Option, /// Type of data currenty being requested from peer. @@ -360,6 +357,8 @@ impl ChainSync { } self.syncing_difficulty = From::from(0u64); self.state = SyncState::Idle; + // Reactivate peers only if some progress has been made + // since the last sync round of if starting fresh. self.active_peers = self.peers.keys().cloned().collect(); } @@ -371,7 +370,8 @@ impl ChainSync { self.continue_sync(io); } - /// Remove peer from active peer set + /// Remove peer from active peer set. Peer will be reactivated on the next sync + /// round. fn deactivate_peer(&mut self, io: &mut SyncIo, peer_id: PeerId) { trace!(target: "sync", "Deactivating peer {}", peer_id); self.active_peers.remove(&peer_id); @@ -401,7 +401,6 @@ impl ChainSync { network_id: try!(r.val_at(1)), difficulty: Some(try!(r.val_at(2))), latest_hash: try!(r.val_at(3)), - latest_number: None, genesis: try!(r.val_at(4)), asking: PeerAsking::Nothing, asking_blocks: Vec::new(), @@ -434,7 +433,11 @@ impl ChainSync { } self.peers.insert(peer_id.clone(), peer); - self.active_peers.insert(peer_id.clone()); + // Don't activate peer immediatelly when searching for common block. + // Let the current sync round complete first. + if self.state != SyncState::ChainHead { + self.active_peers.insert(peer_id.clone()); + } debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id)); if let Some((fork_block, _)) = self.fork_block { self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader); @@ -521,7 +524,7 @@ impl ChainSync { continue; } - if self.highest_block == None || number > self.highest_block.unwrap() { + if self.highest_block.as_ref().map_or(true, |n| number > *n) { self.highest_block = Some(number); } let hash = info.hash(); @@ -553,9 +556,9 @@ impl ChainSync { } if headers.is_empty() { - // Peer does not have any new subchain heads, deactivate it nd try with another + // Peer does not have any new subchain heads, deactivate it and try with another. trace!(target: "sync", "{} Disabled for no data", peer_id); - io.disable_peer(peer_id); + self.deactivate_peer(io, peer_id); } match self.state { SyncState::ChainHead => { @@ -634,9 +637,9 @@ impl ChainSync { } let mut unknown = false; { - let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest_hash = header.hash(); - peer.latest_number = Some(header.number()); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.latest_hash = header.hash(); + } } if self.last_imported_block > header.number() && self.last_imported_block - header.number() > MAX_NEW_BLOCK_AGE { trace!(target: "sync", "Ignored ancient new block {:?}", h); @@ -729,9 +732,9 @@ impl ChainSync { new_hashes.push(hash.clone()); if number > max_height { trace!(target: "sync", "New unknown block hash {:?}", hash); - let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest_hash = hash.clone(); - peer.latest_number = Some(number); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.latest_hash = hash.clone(); + } max_height = number; } }, @@ -808,15 +811,18 @@ impl ChainSync { return; } let (peer_latest, peer_difficulty) = { - let peer = self.peers.get_mut(&peer_id).unwrap(); - if peer.asking != PeerAsking::Nothing || !peer.can_sync() { - return; - } - if self.state == SyncState::Waiting { - trace!(target: "sync", "Waiting for the block queue"); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if peer.asking != PeerAsking::Nothing || !peer.can_sync() { + return; + } + if self.state == SyncState::Waiting { + trace!(target: "sync", "Waiting for the block queue"); + return; + } + (peer.latest_hash.clone(), peer.difficulty.clone()) + } else { return; } - (peer.latest_hash.clone(), peer.difficulty.clone()) }; let chain_info = io.chain().chain_info(); let td = chain_info.pending_total_difficulty; @@ -893,35 +899,40 @@ impl ChainSync { // check to see if we need to download any block bodies first let needed_bodies = self.blocks.needed_bodies(MAX_BODIES_TO_REQUEST, ignore_others); if !needed_bodies.is_empty() { - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_bodies.clone()); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.asking_blocks = needed_bodies.clone(); + } self.request_bodies(io, peer_id, needed_bodies); return; } // find subchain to download if let Some((h, count)) = self.blocks.needed_headers(MAX_HEADERS_TO_REQUEST, ignore_others) { - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, vec![h.clone()]); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.asking_blocks = vec![h.clone()]; + } self.request_headers_by_hash(io, peer_id, &h, count, 0, false, PeerAsking::BlockHeaders); } } /// Clear all blocks/headers marked as being downloaded by a peer. fn clear_peer_download(&mut self, peer_id: PeerId) { - let peer = self.peers.get_mut(&peer_id).unwrap(); - match peer.asking { - PeerAsking::BlockHeaders | PeerAsking::Heads => { - for b in &peer.asking_blocks { - self.blocks.clear_header_download(b); - } - }, - PeerAsking::BlockBodies => { - for b in &peer.asking_blocks { - self.blocks.clear_body_download(b); - } - }, - _ => (), + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + match peer.asking { + PeerAsking::BlockHeaders | PeerAsking::Heads => { + for b in &peer.asking_blocks { + self.blocks.clear_header_download(b); + } + }, + PeerAsking::BlockBodies => { + for b in &peer.asking_blocks { + self.blocks.clear_body_download(b); + } + }, + _ => (), + } + peer.asking_blocks.clear(); } - peer.asking_blocks.clear(); } fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) { @@ -1030,30 +1041,34 @@ impl ChainSync { /// Reset peer status after request is complete. fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool { - let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.expired = false; - if peer.asking != asking { - trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); - peer.asking = PeerAsking::Nothing; + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.expired = false; + if peer.asking != asking { + trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking); + peer.asking = PeerAsking::Nothing; + false + } + else { + peer.asking = PeerAsking::Nothing; + true + } + } else { false } - else { - peer.asking = PeerAsking::Nothing; - true - } } /// Generic request sender fn send_request(&mut self, sync: &mut SyncIo, peer_id: PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { - let peer = self.peers.get_mut(&peer_id).unwrap(); - if peer.asking != PeerAsking::Nothing { - warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); - } - peer.asking = asking; - peer.ask_time = time::precise_time_s(); - if let Err(e) = sync.send(peer_id, packet_id, packet) { - debug!(target:"sync", "Error sending request: {:?}", e); - sync.disable_peer(peer_id); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if peer.asking != PeerAsking::Nothing { + warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); + } + peer.asking = asking; + peer.ask_time = time::precise_time_s(); + if let Err(e) = sync.send(peer_id, packet_id, packet) { + debug!(target:"sync", "Error sending request: {:?}", e); + sync.disable_peer(peer_id); + } } } @@ -1371,7 +1386,7 @@ impl ChainSync { /// creates latest block rlp for the given client fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { let mut rlp_stream = RlpStream::new_list(2); - rlp_stream.append_raw(&chain.block(BlockID::Hash(chain.chain_info().best_block_hash)).unwrap(), 1); + rlp_stream.append_raw(&chain.block(BlockID::Hash(chain.chain_info().best_block_hash)).expect("Best block always exists"), 1); rlp_stream.append(&chain.chain_info().total_difficulty); rlp_stream.out() } @@ -1385,25 +1400,23 @@ impl ChainSync { } /// returns peer ids that have less blocks than our chain - fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> { + fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec { let latest_hash = chain_info.best_block_hash; - let latest_number = chain_info.best_block_number; self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| match io.chain().block_status(BlockID::Hash(peer_info.latest_hash.clone())) { BlockStatus::InChain => { - if peer_info.latest_number.is_none() { - peer_info.latest_number = Some(HeaderView::new(&io.chain().block_header(BlockID::Hash(peer_info.latest_hash.clone())).unwrap()).number()); + if peer_info.latest_hash != latest_hash { + Some(id) + } else { + None } - if peer_info.latest_hash != latest_hash && latest_number > peer_info.latest_number.unwrap() { - Some((id, peer_info.latest_number.unwrap())) - } else { None } }, _ => None }) .collect::>() } - fn select_random_lagging_peers(&mut self, peers: &[(PeerId, BlockNumber)]) -> Vec<(PeerId, BlockNumber)> { + fn select_random_lagging_peers(&mut self, peers: &[PeerId]) -> Vec { use rand::Rng; // take sqrt(x) peers let mut peers = peers.to_vec(); @@ -1416,46 +1429,42 @@ impl ChainSync { } /// propagates latest block to lagging peers - fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256], peers: &[(PeerId, BlockNumber)]) -> usize { + fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256], peers: &[PeerId]) -> usize { trace!(target: "sync", "Sending NewBlocks to {:?}", peers); let mut sent = 0; - for &(peer_id, _) in peers { + for peer_id in peers { if sealed.is_empty() { let rlp = ChainSync::create_latest_block_rlp(io.chain()); - self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); + self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp); } else { for h in sealed { let rlp = ChainSync::create_new_block_rlp(io.chain(), h); - self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); + self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp); } } - self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone(); - self.peers.get_mut(&peer_id).unwrap().latest_number = Some(chain_info.best_block_number); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.latest_hash = chain_info.best_block_hash.clone(); + } sent += 1; } sent } /// propagates new known hashes to all peers - fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[(PeerId, BlockNumber)]) -> usize { + fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, peers: &[PeerId]) -> usize { trace!(target: "sync", "Sending NewHashes to {:?}", peers); let mut sent = 0; - let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); - for &(peer_id, peer_number) in peers { - let peer_best = if chain_info.best_block_number - peer_number > MAX_PEER_LAG_PROPAGATION as BlockNumber { - // If we think peer is too far behind just send one latest hash - last_parent.clone() - } else { - self.peers.get(&peer_id).unwrap().latest_hash.clone() - }; - sent += match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &chain_info.best_block_hash) { + let last_parent = HeaderView::new(&io.chain().block_header(BlockID::Hash(chain_info.best_block_hash.clone())) + .expect("Best block always exists")).parent_hash(); + for peer_id in peers { + sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) { Some(rlp) => { { - let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest_hash = chain_info.best_block_hash.clone(); - peer.latest_number = Some(chain_info.best_block_number); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.latest_hash = chain_info.best_block_hash.clone(); + } } - self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); + self.send_packet(io, *peer_id, NEW_BLOCK_HASHES_PACKET, rlp); 1 }, None => 0 @@ -1735,7 +1744,6 @@ mod tests { genesis: H256::zero(), network_id: U256::zero(), latest_hash: peer_latest_hash, - latest_number: None, difficulty: None, asking: PeerAsking::Nothing, asking_blocks: Vec::new(), diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index d8d3d0711be..c54529beb1a 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -95,6 +95,27 @@ fn forked() { assert_eq!(&*net.peer(2).chain.numbers.read(), &peer1_chain); } +#[test] +fn forked_with_misbehaving_peer() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + // peer 0 is on a totally different chain with higher total difficulty + net.peer_mut(0).chain = TestBlockChainClient::new_with_extra_data(b"fork".to_vec()); + net.peer_mut(0).chain.add_blocks(500, EachBlockWith::Nothing); + net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Nothing); + net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Nothing); + + net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Nothing); + net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle); + // peer 1 should sync to peer 2, others should not change + let peer0_chain = net.peer(0).chain.numbers.read().clone(); + let peer2_chain = net.peer(2).chain.numbers.read().clone(); + net.sync(); + assert_eq!(&*net.peer(0).chain.numbers.read(), &peer0_chain); + assert_eq!(&*net.peer(1).chain.numbers.read(), &peer2_chain); + assert_eq!(&*net.peer(2).chain.numbers.read(), &peer2_chain); +} + #[test] fn net_hard_fork() { ::env_logger::init().ok(); @@ -116,11 +137,12 @@ fn net_hard_fork() { #[test] fn restart() { + ::env_logger::init().ok(); let mut net = TestNet::new(3); net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); - net.sync_steps(8); + net.sync(); // make sure that sync has actually happened assert!(net.peer(0).chain.chain_info().best_block_number > 100); diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index bdb9f359084..bed704abc01 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -26,6 +26,7 @@ pub struct TestIo<'p> { pub chain: &'p mut TestBlockChainClient, pub queue: &'p mut VecDeque, pub sender: Option, + pub to_disconnect: HashSet, overlay: RwLock>, } @@ -35,16 +36,19 @@ impl<'p> TestIo<'p> { chain: chain, queue: queue, sender: sender, + to_disconnect: HashSet::new(), overlay: RwLock::new(HashMap::new()), } } } impl<'p> SyncIo for TestIo<'p> { - fn disable_peer(&mut self, _peer_id: PeerId) { + fn disable_peer(&mut self, peer_id: PeerId) { + self.disconnect_peer(peer_id); } - fn disconnect_peer(&mut self, _peer_id: PeerId) { + fn disconnect_peer(&mut self, peer_id: PeerId) { + self.to_disconnect.insert(peer_id); } fn is_expired(&self) -> bool { @@ -141,13 +145,30 @@ impl TestNet { pub fn sync_step(&mut self) { for peer in 0..self.peers.len() { if let Some(packet) = self.peers[peer].queue.pop_front() { - let mut p = self.peers.get_mut(packet.recipient).unwrap(); - trace!("--- {} -> {} ---", peer, packet.recipient); - ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data); - trace!("----------------"); + let disconnecting = { + let mut p = self.peers.get_mut(packet.recipient).unwrap(); + trace!("--- {} -> {} ---", peer, packet.recipient); + let to_disconnect = { + let mut io = TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)); + ChainSync::dispatch_packet(&p.sync, &mut io, peer as PeerId, packet.packet_id, &packet.data); + io.to_disconnect + }; + for d in &to_disconnect { + // notify this that disconnecting peers are disconnecting + let mut io = TestIo::new(&mut p.chain, &mut p.queue, Some(*d)); + p.sync.write().on_peer_aborting(&mut io, *d); + } + to_disconnect + }; + for d in &disconnecting { + // notify other peers that this peer is disconnecting + let mut p = self.peers.get_mut(*d).unwrap(); + let mut io = TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)); + p.sync.write().on_peer_aborting(&mut io, peer as PeerId); + } } - let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.write().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None)); + + self.sync_step_peer(peer); } } diff --git a/util/Cargo.toml b/util/Cargo.toml index e6004da2ad6..ae8643c7d9b 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -3,7 +3,7 @@ description = "Ethcore utility library" homepage = "http://ethcore.io" license = "GPL-3.0" name = "ethcore-util" -version = "1.3.5" +version = "1.3.6" authors = ["Ethcore "] build = "build.rs"