Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support fast-sync algorithm. #13

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions substrate/client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,14 @@ impl<Block: BlockT> blockchain::Backend<Block> for Blockchain<Block> {
) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
unimplemented!("Not supported by the in-mem backend.")
}

fn clear_block_gap(&self){
unimplemented!("Not supported by the in-mem backend.")
}

fn update_block_gap(&self, _: NumberFor<Block>, _: NumberFor<Block>){
unimplemented!("Not supported by the in-mem backend.")
}
}

impl<Block: BlockT> backend::AuxStore for Blockchain<Block> {
Expand Down
10 changes: 9 additions & 1 deletion substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! queues to be instantiated simply.

use async_lock::RwLock;
use log::{debug, trace};
use log::{debug, info, trace};
use std::{
fmt,
future::Future,
Expand Down Expand Up @@ -295,6 +295,7 @@ where
Block: BlockT,
BI: BlockImport<Block, Error = ConsensusError>,
{
println!("import_single_block");
match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None)
.await?
{
Expand Down Expand Up @@ -375,6 +376,9 @@ where
{
let peer = block.origin;

let number = block.header.clone().map(|h| *h.number());
info!("*** start verify_single_block_metered: {:?} {:?}", number.clone(), block.hash);

let (header, justifications) = match (block.header, block.justifications) {
(Some(header), justifications) => (header, justifications),
(None, _) => {
Expand Down Expand Up @@ -416,6 +420,8 @@ where
},
}

println!("*** after check: {:?}", block.hash);

let started = Instant::now();

let mut import_block = BlockImportParams::new(block_origin, header);
Expand Down Expand Up @@ -458,6 +464,8 @@ where
metrics.report_verification(true, verification_time);
}

info!("*** finish verify_single_block_metered: {:?} {:?}", number.clone(), block.hash);

Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
import_block,
hash,
Expand Down
25 changes: 22 additions & 3 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,16 @@ impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<B
Err(sp_blockchain::Error::Backend(format!("Error decoding body list: {}", err))),
}
}

fn clear_block_gap(&self) {
debug!(target: "sync", "Clear block gap.");
self.update_block_gap(None);
}

fn update_block_gap(&self, start: NumberFor<Block>, end: NumberFor<Block>){
debug!(target: "sync", "Update block gap: {}-{}", start, end);
self.update_block_gap(Some((start, end)));
}
}

impl<Block: BlockT> HeaderMetadata<Block> for BlockchainDb<Block> {
Expand Down Expand Up @@ -1761,6 +1771,7 @@ impl<Block: BlockT> Backend<Block> {
for m in meta_updates {
self.blockchain.update_meta(m);
}

self.blockchain.update_block_gap(block_gap);

Ok(())
Expand Down Expand Up @@ -2501,18 +2512,26 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
fn pin_block(&self, hash: <Block as BlockT>::Hash) -> sp_blockchain::Result<()> {
let hint = || {
let header_metadata = self.blockchain.header_metadata(hash);
println!("pin_block {hash:?}: header_metadata: {:?}", header_metadata);
header_metadata
.map(|hdr| {
sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref())
.unwrap_or(None)
let result = sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref());
match result {
Ok(_) => {println!("sc_state_db::NodeDb::get {hash:?}: result: OK",);}
Err(ref err) => {println!("sc_state_db::NodeDb::get {hash:?}: result: {err:?}");}
};

result.unwrap_or(None)
.is_some()
})
.unwrap_or(false)
};

if let Some(number) = self.blockchain.number(hash)? {
println!("self.blockchain.number #{number} {hash:?}");
self.storage.state_db.pin(&hash, number.saturated_into::<u64>(), hint).map_err(
|_| {
|err| {
println!("storage.state_db.pin #{number }{hash:?} {err:?}");
sp_blockchain::Error::UnknownBlock(format!(
"State already discarded for `{:?}`",
hash
Expand Down
25 changes: 25 additions & 0 deletions substrate/client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,21 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
rx.await.map_err(|_| ())
}

/// Returns a collection of currently connected (open) peers.
pub async fn open_peers(&self) -> Result<Vec<PeerId>, ()> {
let (tx, rx) = oneshot::channel();

let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::OpenPeers { pending_response: tx });

match rx.await {
Ok(v) => Ok(v),
// The channel can only be closed if the network worker no longer exists.
Err(_) => Err(()),
}
}

/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
///
/// Returns an `Err` if one of the given addresses is invalid or contains an
Expand Down Expand Up @@ -1173,6 +1188,9 @@ enum ServiceToWorkerMsg {
NetworkState {
pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
},
OpenPeers {
pending_response: oneshot::Sender<Vec<PeerId>>,
},
DisconnectPeer(PeerId, ProtocolName),
}

Expand Down Expand Up @@ -1315,9 +1333,16 @@ where
.behaviour_mut()
.user_protocol_mut()
.disconnect_peer(&who, protocol_name),
ServiceToWorkerMsg::OpenPeers { pending_response} => {
let _ = pending_response.send(self.open_peers());
}
}
}

fn open_peers(&self) -> Vec<PeerId> {
self.network_service.behaviour().user_protocol().open_peers().cloned().collect::<Vec<_>>()
}

/// Process the next event coming from `Swarm`.
fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut, THandlerErr<Behaviour<B>>>) {
match event {
Expand Down
9 changes: 7 additions & 2 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
/// If the block announces stream to peer has been inactive for 30 seconds meaning local node
/// has not sent or received block announcements to/from the peer, report the node for inactivity,
/// disconnect it and attempt to establish connection to some other peer.
const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30);
const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(1000);

/// When `SyncingEngine` is started, wait two minutes before actually staring to count peers as
/// evicted.
Expand All @@ -114,7 +114,7 @@ const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30);
///
/// To prevent this from happening, define a threshold for how long `SyncingEngine` should wait
/// before it starts evicting peers.
const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 60);
const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 600);

/// Maximum allowed size for a block announce.
const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
Expand Down Expand Up @@ -839,6 +839,9 @@ where
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
self.strategy.set_sync_fork_request(peers, &hash, number);
},
ToServiceCommand::NewBestBlockNumber(number) => {
self.strategy.update_common_number_for_peers(number);
},
ToServiceCommand::EventStream(tx) => self.event_streams.push(tx),
ToServiceCommand::RequestJustification(hash, number) =>
self.strategy.request_justification(&hash, number),
Expand Down Expand Up @@ -1227,6 +1230,7 @@ where

match Self::encode_state_request(&request) {
Ok(data) => {
println!("Preparing state request: {}, peer_id={peer_id}", data.len());
self.network_service.start_request(
peer_id,
self.state_request_protocol_name.clone(),
Expand Down Expand Up @@ -1283,6 +1287,7 @@ where
}

fn decode_state_response(response: &[u8]) -> Result<OpaqueStateResponse, String> {
println!("decode_state_response: {}", response.len());
let response = StateResponse::decode(response)
.map_err(|error| format!("Failed to decode state response: {error}"))?;

Expand Down
Loading
Loading