From 90049ef9ae73a3a0f535545f0285b6bfc118425c Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Fri, 29 Mar 2024 15:27:04 +0700 Subject: [PATCH 1/5] Introduce fast-sync-engine. --- .../network/sync/src/fast_sync_engine.rs | 386 ++++++++++++++++++ .../src/fast_sync_engine/syncing_service.rs | 73 ++++ substrate/client/network/sync/src/lib.rs | 1 + .../network/sync/src/state_request_handler.rs | 4 +- .../primitives/consensus/common/src/lib.rs | 2 + 5 files changed, 464 insertions(+), 2 deletions(-) create mode 100644 substrate/client/network/sync/src/fast_sync_engine.rs create mode 100644 substrate/client/network/sync/src/fast_sync_engine/syncing_service.rs diff --git a/substrate/client/network/sync/src/fast_sync_engine.rs b/substrate/client/network/sync/src/fast_sync_engine.rs new file mode 100644 index 000000000000..c8371054cd90 --- /dev/null +++ b/substrate/client/network/sync/src/fast_sync_engine.rs @@ -0,0 +1,386 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! `SyncingEngine` is the actor responsible for syncing Substrate chain +//! to tip and keep the blockchain up to date with network updates. + +mod syncing_service; + +use crate::{ + pending_responses::{PendingResponses, ResponseEvent}, + schema::v1::{StateRequest, StateResponse}, + service::{ + self, + }, + strategy::{ + state::StateStrategy + }, + types::{ + BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, + }, + fast_sync_engine::syncing_service::{SyncingService, ToServiceCommand}, + LOG_TARGET, +}; + +use futures::{ + channel::oneshot, + FutureExt, StreamExt, +}; +use libp2p::{request_response::OutboundFailure, PeerId}; +use log::{debug, error, info, trace}; +use prost::Message; + +use sc_client_api::{BlockBackend, ProofProvider}; +use sc_network::{ + request_responses::{IfDisconnected, RequestFailure}, + types::ProtocolName, + utils::LruHashSet, +}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; +use sp_blockchain::{Error as ClientError}; +use sp_runtime::{Justifications, traits::{Block as BlockT,}}; +use std::{ + collections::{HashMap}, + sync::{ + Arc, + }, + time::{ Instant }, +}; +use tokio::sync::Mutex; +use sc_consensus::import_queue::ImportQueueService; +use sc_consensus::IncomingBlock; +use sp_runtime::traits::{NumberFor, Zero}; +use crate::state_request_handler::generate_protocol_name; +use crate::strategy::state::StateStrategyAction; + +/// Peer information +#[derive(Clone, Debug)] +pub struct Peer { + pub info: ExtendedPeerInfo, + /// Holds a set of blocks known to this peer. + pub known_blocks: LruHashSet, +} + +mod rep { + use sc_network::ReputationChange as Rep; + /// We received a message that failed to decode. + pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); + /// Peer is on unsupported protocol version. + pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol"); + /// Reputation change when a peer refuses a request. + pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused"); + /// Reputation change when a peer doesn't respond in time to our messages. + pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout"); +} + +pub struct FastSyncingEngine where + IQS: ImportQueueService + ?Sized, +{ + /// Syncing strategy. + strategy: StateStrategy, + + /// Network service. + network_service: service::network::NetworkServiceHandle, + + /// Channel for receiving service commands + service_rx: TracingUnboundedReceiver>, + + /// All connected peers. Contains both full and light node peers. + peers: HashMap>, + + /// When the syncing was started. + /// + /// Stored as an `Option` so once the initial wait has passed, `SyncingEngine` + /// can reset the peer timers and continue with the normal eviction process. + syncing_started: Option, + + /// Pending responses + pending_responses: PendingResponses, + + /// Protocol name used to send out state requests + state_request_protocol_name: ProtocolName, + + /// Handle to import queue. + import_queue: Arc>>, + + last_block: Option>, +} + +impl FastSyncingEngine +where + B: BlockT, + IQS: ImportQueueService + ?Sized + 'static, +{ + pub fn new + + ProofProvider + + Send + + Sync + + 'static>( + client: Arc, + import_queue:Arc>>, + network_service: service::network::NetworkServiceHandle, + fork_id: Option<&str>, + target_header: B::Header, + target_body: Option>, + target_justifications: Option, + skip_proof: bool, + current_sync_peer: (PeerId, NumberFor), + ) -> Result<(Self, SyncingService,), ClientError> { + let genesis_hash = client + .block_hash(Zero::zero()) + .ok() + .flatten() + .expect("Genesis block exists; qed"); + let state_request_protocol_name = generate_protocol_name(genesis_hash, fork_id).into(); + + // Initialize syncing strategy. + let strategy = + StateStrategy::new(client.clone(), target_header, target_body, target_justifications, skip_proof, vec![current_sync_peer].into_iter()); + + let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); + + Ok(( + Self { + import_queue, + strategy, + network_service, + peers: HashMap::new(), + service_rx, + syncing_started: None, + pending_responses: PendingResponses::new(), + state_request_protocol_name, + last_block: None, + }, + SyncingService::new(tx), + )) + } + + pub async fn run(mut self) -> Result>, ClientError> { + self.syncing_started = Some(Instant::now()); + + loop { + tokio::select! { + command = self.service_rx.select_next_some() => + self.process_service_command(command), + response_event = self.pending_responses.select_next_some() => + self.process_response_event(response_event), + } + + // Process actions requested by a syncing strategy. + match self.process_strategy_actions().await { + Ok(Some(_)) => { + continue; + } + Ok(None) => { + info!("State import finished."); + break; + } + Err(e) => { + error!("Terminating `FastSyncingEngine` due to fatal error: {e:?}"); + return Err(e) + } + } + } + + return Ok(self.last_block.take()); + } + + async fn process_strategy_actions(&mut self) -> Result, ClientError> { + let actions = self.strategy.actions().collect::>(); + if actions.is_empty(){ + return Err(ClientError::Backend("Fast sync failed - no further actions.".into())) + } + + for action in actions.into_iter() { + match action { + StateStrategyAction::SendStateRequest { peer_id, request } => { + println!("Sending state request: {peer_id}"); + self.send_state_request(peer_id, request); + } + StateStrategyAction::DropPeer(BadPeer(peer_id, rep)) => { + self.pending_responses.remove(&peer_id); + self.network_service + .disconnect_peer(peer_id, self.state_request_protocol_name.clone()); + self.network_service.report_peer(peer_id, rep); + + trace!(target: LOG_TARGET, "{peer_id:?} dropped: {rep:?}."); + } + StateStrategyAction::ImportBlocks { origin, blocks } => { + self.last_block = blocks.first().cloned(); + let block_len = blocks.len(); + self.import_queue.lock().await.import_blocks(origin, blocks); + + info!("Import blocks finished, blocks len = {block_len}", ); + return Ok(None) + } + StateStrategyAction::Finished => { + println!("StateStrategyAction::Finished"); + } + } + } + + Ok(Some(())) + } + + fn process_service_command(&mut self, command: ToServiceCommand) { + match command { + ToServiceCommand::Status(tx) => { + let mut status = self.strategy.status(); + status.num_connected_peers = self.peers.len() as u32; + let _ = tx.send(status); + }, + ToServiceCommand::PeersInfo(tx) => { + let peers_info = self + .peers + .iter() + .map(|(peer_id, peer)| (*peer_id, peer.info.clone())) + .collect(); + let _ = tx.send(peers_info); + }, + ToServiceCommand::Start(tx) => { + let _ = tx.send(()); + } + } + } + + fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) { + let (tx, rx) = oneshot::channel(); + + self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed()); + + match Self::encode_state_request(&request) { + Ok(data) => { + println!("Preparing state request: {}, peer_id={peer_id}", data.len()); + self.network_service.start_request( + peer_id, + self.state_request_protocol_name.clone(), + data, + tx, + IfDisconnected::ImmediateError, + ); + }, + Err(err) => { + log::warn!( + target: LOG_TARGET, + "Failed to encode state request {request:?}: {err:?}", + ); + }, + } + } + + fn encode_state_request(request: &OpaqueStateRequest) -> Result, String> { + let request: &StateRequest = request.0.downcast_ref().ok_or_else(|| { + "Failed to downcast opaque state response during encoding, this is an \ + implementation bug." + .to_string() + })?; + + Ok(request.encode_to_vec()) + } + + fn decode_state_response(response: &[u8]) -> Result { + println!("decode_state_response: {}", response.len()); + let response = StateResponse::decode(response) + .map_err(|error| format!("Failed to decode state response: {error}"))?; + + Ok(OpaqueStateResponse(Box::new(response))) + } + + fn process_response_event(&mut self, response_event: ResponseEvent) { + let ResponseEvent { peer_id, request, response } = response_event; + println!("Process response event: {peer_id}"); + + match response { + Ok(Ok((resp, _))) => match request { + PeerRequest::Block(req) => { + error!("Unexpected PeerRequest::Block - {:?}", req); + }, + PeerRequest::State => { + let response = match Self::decode_state_response(&resp[..]) { + Ok(proto) => proto, + Err(e) => { + debug!( + target: LOG_TARGET, + "Failed to decode state response from peer {peer_id:?}: {e:?}.", + ); + self.network_service.report_peer(peer_id, rep::BAD_MESSAGE); + self.network_service.disconnect_peer( + peer_id, + self.state_request_protocol_name.clone(), + ); + return + }, + }; + + self.strategy.on_state_response(peer_id, response); + }, + PeerRequest::WarpProof => { + error!("Unexpected PeerRequest::WarpProof",); + }, + }, + Ok(Err(e)) => { + debug!(target: LOG_TARGET, "Request to peer {peer_id:?} failed: {e:?}."); + + match e { + RequestFailure::Network(OutboundFailure::Timeout) => { + self.network_service.report_peer(peer_id, rep::TIMEOUT); + self.network_service + .disconnect_peer(peer_id, self.state_request_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => { + self.network_service.report_peer(peer_id, rep::BAD_PROTOCOL); + self.network_service + .disconnect_peer(peer_id, self.state_request_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::DialFailure) => { + self.network_service + .disconnect_peer(peer_id, self.state_request_protocol_name.clone()); + }, + RequestFailure::Refused => { + self.network_service.report_peer(peer_id, rep::REFUSED); + self.network_service + .disconnect_peer(peer_id, self.state_request_protocol_name.clone()); + }, + RequestFailure::Network(OutboundFailure::ConnectionClosed) | + RequestFailure::NotConnected => { + self.network_service + .disconnect_peer(peer_id, self.state_request_protocol_name.clone()); + }, + RequestFailure::UnknownProtocol => { + debug_assert!(false, "Block request protocol should always be known."); + }, + RequestFailure::Obsolete => { + debug_assert!( + false, + "Can not receive `RequestFailure::Obsolete` after dropping the \ + response receiver.", + ); + }, + } + }, + Err(oneshot::Canceled) => { + trace!( + target: LOG_TARGET, + "Request to peer {peer_id:?} failed due to oneshot being canceled.", + ); + self.network_service + .disconnect_peer(peer_id, self.state_request_protocol_name.clone()); + }, + } + } +} diff --git a/substrate/client/network/sync/src/fast_sync_engine/syncing_service.rs b/substrate/client/network/sync/src/fast_sync_engine/syncing_service.rs new file mode 100644 index 000000000000..462a45549df7 --- /dev/null +++ b/substrate/client/network/sync/src/fast_sync_engine/syncing_service.rs @@ -0,0 +1,73 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::types::{ExtendedPeerInfo, SyncStatus, }; + +use futures::{channel::oneshot,}; +use libp2p::PeerId; +use sc_utils::mpsc::TracingUnboundedSender; +use sp_runtime::traits::{Block as BlockT}; + +/// Commands send to `SyncingEngine` +pub enum ToServiceCommand { + Start(oneshot::Sender<()>), + Status(oneshot::Sender>), + PeersInfo(oneshot::Sender)>>), +} + +/// Handle for communicating with `SyncingEngine` asynchronously +#[derive(Clone)] +pub struct SyncingService { + tx: TracingUnboundedSender>, +} + +impl SyncingService { + /// Create new handle + pub fn new( + tx: TracingUnboundedSender>, + ) -> Self { + Self { tx, } + } + + /// Get peer information. + pub async fn peers_info( + &self, + ) -> Result)>, oneshot::Canceled> { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx)); + + rx.await + } + + /// Get sync status + /// + /// Returns an error if `SyncingEngine` has terminated. + pub async fn status(&self) -> Result, oneshot::Canceled> { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx)); + + rx.await + } + + pub async fn start(&self) -> Result<(), oneshot::Canceled> { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send(ToServiceCommand::Start(tx)); + + rx.await + } +} diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 494e3b87aa95..af8ebeabb471 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -39,6 +39,7 @@ pub mod service; pub mod state_request_handler; pub mod strategy; pub mod warp_request_handler; +pub mod fast_sync_engine; /// Log target for this crate. const LOG_TARGET: &str = "sync"; diff --git a/substrate/client/network/sync/src/state_request_handler.rs b/substrate/client/network/sync/src/state_request_handler.rs index 6bd2389fb5d1..1a79669d6759 100644 --- a/substrate/client/network/sync/src/state_request_handler.rs +++ b/substrate/client/network/sync/src/state_request_handler.rs @@ -53,7 +53,7 @@ mod rep { } /// Generates a [`ProtocolConfig`] for the state request protocol, refusing incoming requests. -pub fn generate_protocol_config>( +fn generate_protocol_config>( protocol_id: &ProtocolId, genesis_hash: Hash, fork_id: Option<&str>, @@ -70,7 +70,7 @@ pub fn generate_protocol_config>( } /// Generate the state protocol name from the genesis hash and fork id. -fn generate_protocol_name>(genesis_hash: Hash, fork_id: Option<&str>) -> String { +pub fn generate_protocol_name>(genesis_hash: Hash, fork_id: Option<&str>) -> String { let genesis_hash = genesis_hash.as_ref(); if let Some(fork_id) = fork_id { format!("/{}/{}/state/2", array_bytes::bytes2hex("", genesis_hash), fork_id) diff --git a/substrate/primitives/consensus/common/src/lib.rs b/substrate/primitives/consensus/common/src/lib.rs index 6505d005deb8..a6233e99cde9 100644 --- a/substrate/primitives/consensus/common/src/lib.rs +++ b/substrate/primitives/consensus/common/src/lib.rs @@ -69,6 +69,8 @@ pub enum BlockOrigin { Own, /// Block was imported from a file. File, + /// Block was imported using fast sync. + FastSync, } /// Environment for a Consensus instance. From e07b4a1750192f3dd409a25ba5ff9a6a8525b6f1 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Fri, 29 Mar 2024 15:28:41 +0700 Subject: [PATCH 2/5] Modify internal constants - INACTIVITY_EVICT_THRESHOLD - INITIAL_EVICTION_WAIT_PERIOD - MAX_DOWNLOAD_AHEAD --- substrate/client/network/sync/src/engine.rs | 4 ++-- substrate/client/network/sync/src/strategy/chain_sync.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 8bf396fa0e42..c97d2aa33df2 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -102,7 +102,7 @@ const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead /// If the block announces stream to peer has been inactive for 30 seconds meaning local node /// has not sent or received block announcements to/from the peer, report the node for inactivity, /// disconnect it and attempt to establish connection to some other peer. -const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30); +const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(1000); /// When `SyncingEngine` is started, wait two minutes before actually staring to count peers as /// evicted. @@ -114,7 +114,7 @@ const INACTIVITY_EVICT_THRESHOLD: Duration = Duration::from_secs(30); /// /// To prevent this from happening, define a threshold for how long `SyncingEngine` should wait /// before it starts evicting peers. -const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 60); +const INITIAL_EVICTION_WAIT_PERIOD: Duration = Duration::from_secs(2 * 600); /// Maximum allowed size for a block announce. const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 2df5f5046667..50a5e4da13ed 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -76,7 +76,7 @@ mod test; const MAX_IMPORTING_BLOCKS: usize = 2048; /// Maximum blocks to download ahead of any gap. -const MAX_DOWNLOAD_AHEAD: u32 = 2048; +const MAX_DOWNLOAD_AHEAD: u32 = 20480; /// Maximum blocks to look backwards. The gap is the difference between the highest block and the /// common block of a node. From e3d957ced951e874f109051eb194e47459f4937a Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Fri, 29 Mar 2024 15:42:57 +0700 Subject: [PATCH 3/5] Add NewBestBlockNumber event. --- substrate/client/network/sync/src/engine.rs | 3 +++ .../network/sync/src/service/syncing_service.rs | 10 ++++++++++ substrate/client/network/sync/src/strategy.rs | 14 +++++++++++++- .../client/network/sync/src/strategy/chain_sync.rs | 10 ++++++++++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index c97d2aa33df2..f758d318f035 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -839,6 +839,9 @@ where ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { self.strategy.set_sync_fork_request(peers, &hash, number); }, + ToServiceCommand::NewBestBlockNumber(number) => { + self.strategy.update_common_number_for_peers(number); + }, ToServiceCommand::EventStream(tx) => self.event_streams.push(tx), ToServiceCommand::RequestJustification(hash, number) => self.strategy.request_justification(&hash, number), diff --git a/substrate/client/network/sync/src/service/syncing_service.rs b/substrate/client/network/sync/src/service/syncing_service.rs index 92d649d65dc3..8b1049ba6a07 100644 --- a/substrate/client/network/sync/src/service/syncing_service.rs +++ b/substrate/client/network/sync/src/service/syncing_service.rs @@ -36,6 +36,7 @@ use std::{ /// Commands send to `SyncingEngine` pub enum ToServiceCommand { + NewBestBlockNumber(NumberFor), SetSyncForkRequest(Vec, B::Hash, NumberFor), RequestJustification(B::Hash, NumberFor), ClearJustificationRequests, @@ -91,6 +92,15 @@ impl SyncingService { rx.await } + pub fn new_best_number( + &self, + number: NumberFor, + ) { + let _ = self + .tx + .unbounded_send(ToServiceCommand::NewBestBlockNumber(number)); + } + /// Get best seen block. pub async fn best_seen_block(&self) -> Result>, oneshot::Canceled> { let (tx, rx) = oneshot::channel(); diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index 2bc3a08bd6c4..b3bb33713570 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -20,7 +20,7 @@ //! and specific syncing algorithms. pub mod chain_sync; -mod state; +pub mod state; pub mod state_sync; pub mod warp; @@ -200,6 +200,18 @@ where } } + pub fn update_common_number_for_peers( + &mut self, + number: NumberFor, + ) { + match self { + SyncingStrategy::WarpSyncStrategy(_) => {}, + SyncingStrategy::StateSyncStrategy(_) => {}, + SyncingStrategy::ChainSyncStrategy(strategy) => + strategy.update_common_number_for_peers(number), + } + } + /// Request extra justification. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { match self { diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 50a5e4da13ed..ea52e6d6d2d5 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -1287,6 +1287,16 @@ where } } + pub fn update_common_number_for_peers(&mut self, new_common: NumberFor) { + for peer in self.peers.values_mut() { + if peer.best_number >= new_common { + peer.update_common_number(new_common); + } else { + peer.update_common_number(peer.best_number); + } + } + } + /// Called when a block has been queued for import. /// /// Updates our internal state for best queued block and then goes From 97f8925c2b182aeed8b4603578eb9422c33f3642 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Fri, 29 Mar 2024 15:47:22 +0700 Subject: [PATCH 4/5] Modify internal infrastructure for fast-sync. --- substrate/client/api/src/in_mem.rs | 8 ++ substrate/client/db/src/lib.rs | 11 +++ substrate/client/network/src/service.rs | 25 +++++ substrate/client/service/src/client/client.rs | 95 ++++++++++++++++++- substrate/client/service/src/lib.rs | 26 ++++- .../primitives/blockchain/src/backend.rs | 4 + 6 files changed, 165 insertions(+), 4 deletions(-) diff --git a/substrate/client/api/src/in_mem.rs b/substrate/client/api/src/in_mem.rs index b933ed1f17e0..5a0e75b3faaf 100644 --- a/substrate/client/api/src/in_mem.rs +++ b/substrate/client/api/src/in_mem.rs @@ -447,6 +447,14 @@ impl blockchain::Backend for Blockchain { ) -> sp_blockchain::Result>>> { unimplemented!("Not supported by the in-mem backend.") } + + fn clear_block_gap(&self){ + unimplemented!("Not supported by the in-mem backend.") + } + + fn update_block_gap(&self, _: NumberFor, _: NumberFor){ + unimplemented!("Not supported by the in-mem backend.") + } } impl backend::AuxStore for Blockchain { diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 194bec8a88eb..c93e072f5b91 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -790,6 +790,16 @@ impl sc_client_api::blockchain::Backend for BlockchainDb, end: NumberFor){ + debug!(target: "sync", "Update block gap: {}-{}", start, end); + self.update_block_gap(Some((start, end))); + } } impl HeaderMetadata for BlockchainDb { @@ -1761,6 +1771,7 @@ impl Backend { for m in meta_updates { self.blockchain.update_meta(m); } + self.blockchain.update_block_gap(block_gap); Ok(()) diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 47e23337633b..e0cce1718938 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -743,6 +743,21 @@ impl NetworkService { rx.await.map_err(|_| ()) } + /// Returns a collection of currently connected (open) peers. + pub async fn open_peers(&self) -> Result, ()> { + let (tx, rx) = oneshot::channel(); + + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::OpenPeers { pending_response: tx }); + + match rx.await { + Ok(v) => Ok(v), + // The channel can only be closed if the network worker no longer exists. + Err(_) => Err(()), + } + } + /// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates. /// /// Returns an `Err` if one of the given addresses is invalid or contains an @@ -1173,6 +1188,9 @@ enum ServiceToWorkerMsg { NetworkState { pending_response: oneshot::Sender>, }, + OpenPeers { + pending_response: oneshot::Sender>, + }, DisconnectPeer(PeerId, ProtocolName), } @@ -1315,9 +1333,16 @@ where .behaviour_mut() .user_protocol_mut() .disconnect_peer(&who, protocol_name), + ServiceToWorkerMsg::OpenPeers { pending_response} => { + let _ = pending_response.send(self.open_peers()); + } } } + fn open_peers(&self) -> Vec { + self.network_service.behaviour().user_protocol().open_peers().cloned().collect::>() + } + /// Process the next event coming from `Swarm`. fn handle_swarm_event(&mut self, event: SwarmEvent>>) { match event { diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 7942fc9ad735..39ca6557d11e 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -84,11 +84,13 @@ use std::{ path::PathBuf, sync::Arc, }; +use codec::Encode; #[cfg(feature = "test-helpers")] use { super::call_executor::LocalCallExecutor, sc_client_api::in_mem, sp_core::traits::CodeExecutor, }; +use crate::{ClientExt, RawBlockData}; type NotificationSinks = Mutex>>; @@ -118,6 +120,78 @@ where _phantom: PhantomData, } +pub type BlockWeight = u128; + +//TODO: make function from the caller +/// Write the cumulative chain-weight of a block ot aux storage. +pub(crate) fn write_block_weight( + block_hash: H, + block_weight: BlockWeight, + write_aux: F, +) -> R + where + F: FnOnce(&[(Vec, &[u8])]) -> R, +{ + let key = block_weight_key(block_hash); + block_weight.using_encoded(|s| write_aux(&[(key, s)])) +} + +/// The aux storage key used to store the block weight of the given block hash. +pub fn block_weight_key(block_hash: H) -> Vec { + (b"block_weight", block_hash).encode() +} + + +impl ClientExt for Client + where + B: backend::Backend, + E: CallExecutor + Send + Sync, + Block: BlockT, + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: CoreApi + ApiExt, + RA: Sync + Send, +{ + fn import_raw_block(&self, raw_block: RawBlockData) -> Result<(), Error> { + let hash = raw_block.hash; + let number = *raw_block.header.number(); + info!("Importing raw block: {number:?} - {hash:?} "); // TODO: debug + + let mut import_block = BlockImportParams::new(BlockOrigin::FastSync, raw_block.header); + import_block.justifications = raw_block.justifications; + import_block.body = raw_block.block_body; + import_block.state_action = StateAction::Skip; + import_block.finalized = true; + import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain); + import_block.import_existing = false; + + // Set zero block weight to allow the execution of the following blocks. + write_block_weight(hash, 0, |values| { + import_block.auxiliary + .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec())))) + }); + + let result = self.lock_import_and_run(|operation| { + self.apply_block(operation, import_block, None) + }) + .map_err(|e| { + error!("Error during importing of the raw block: {}", e); + ConsensusError::ClientImport(e.to_string()) + })?; + + info!("Raw block imported: {number:?} - {hash:?}. Result: {result:?}"); // TODO: debug + + Ok(()) + } + + fn clear_block_gap(&self){ + self.backend.blockchain().clear_block_gap(); + } + + fn update_block_gap(&self, start: NumberFor, end: NumberFor) { + self.backend.blockchain().update_block_gap(start, end); + } +} + /// Used in importing a block, where additional changes are made after the runtime /// executed. enum PrePostHeader { @@ -336,14 +410,15 @@ where if let Some(ref notification) = import_notification { if let Err(err) = self.backend.pin_block(notification.hash) { error!( - "Unable to pin block for import notification. hash: {}, Error: {}", - notification.hash, err + "Unable to pin block for import notification. hash: {}, number: {}, Error: {}", + notification.hash, notification.header.number(), err ); }; } self.notify_finalized(finality_notification)?; self.notify_imported(import_notification, import_notification_action, storage_changes)?; + println!("After notifications"); Ok(r) }; @@ -533,6 +608,7 @@ where *self.importing_block.write() = Some(hash); + info!("Before execute_and_import_block : {:?}", hash); let result = self.execute_and_import_block( operation, origin, @@ -566,6 +642,8 @@ where } } + info!("After execute_and_import_block : {:?}, {:?}", hash, result); + result } @@ -619,7 +697,7 @@ where let make_notifications = match origin { BlockOrigin::NetworkBroadcast | BlockOrigin::Own | BlockOrigin::ConsensusBroadcast => true, - BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false, + BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File | BlockOrigin::FastSync => false, }; let storage_changes = match storage_changes { @@ -1747,6 +1825,8 @@ where &mut self, mut import_block: BlockImportParams, ) -> Result { +// info!("Client import block: {} {:?}", import_block.header.number(), import_block.header.hash()); + let span = tracing::span!(tracing::Level::DEBUG, "import_block"); let _enter = span.enter(); @@ -1759,6 +1839,8 @@ where PrepareStorageChangesResult::Import(storage_changes) => storage_changes, }; +// info!("Before lock_import_and_run: {} {:?}", import_block.header.number(), import_block.header.hash()); + self.lock_import_and_run(|operation| { self.apply_block(operation, import_block, storage_changes) }) @@ -1773,6 +1855,8 @@ where &self, block: BlockCheckParams, ) -> Result { + info!("Before check block {} {:?}", block.number, block.hash, ); + let BlockCheckParams { hash, number, @@ -1801,6 +1885,8 @@ where BlockLookupResult::NotSpecial => {}, } +// info!("Before self.block_status(hash) {} {:?} {:?} ", block.number, block.hash, self.block_status(hash) ); + // Own status must be checked first. If the block and ancestry is pruned // this function must return `AlreadyInChain` rather than `MissingState` match self @@ -1816,6 +1902,7 @@ where BlockStatus::KnownBad => return Ok(ImportResult::KnownBad), } +// info!("Before self.block_status(parent_hash) {} {:?} {:?} ", block.number, block.hash, self.block_status(parent_hash) ); match self .block_status(parent_hash) .map_err(|e| ConsensusError::ClientImport(e.to_string()))? @@ -1827,6 +1914,8 @@ where BlockStatus::InChainPruned => return Ok(ImportResult::MissingState), BlockStatus::KnownBad => return Ok(ImportResult::KnownBad), } +// info!("After check block {} - after 2 block_status: {:?}, allow_missing_parent={allow_missing_parent}, allow_missing_state = {allow_missing_state}", number, hash, ); + Ok(ImportResult::imported(false)) } diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index bec1044daab4..1ab98e38bff7 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -48,7 +48,7 @@ use sc_network_sync::SyncingService; use sc_utils::mpsc::TracingUnboundedReceiver; use sp_blockchain::HeaderMetadata; use sp_consensus::SyncOracle; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; pub use self::{ builder::{ @@ -88,6 +88,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions; pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool}; #[doc(hidden)] pub use std::{ops::Deref, result::Result, sync::Arc}; +use sp_runtime::Justifications; pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME}; const DEFAULT_PROTOCOL_ID: &str = "sup"; @@ -96,6 +97,29 @@ const DEFAULT_PROTOCOL_ID: &str = "sup"; #[derive(Clone)] pub struct RpcHandlers(Arc>); +#[derive(Clone, Debug)] +/// Data container to insert the block into the BlockchainDb without checks. +pub struct RawBlockData{ + /// Block hash + pub hash: Block::Hash, + /// Block header + pub header: Block::Header, + /// Extrinsics of the block + pub block_body: Option>, + /// Justifications of the block + pub justifications: Option +} + +/// Provides extended functions for `Client` to enable fast-sync. +pub trait ClientExt{ + /// Insert block into BlockchainDb bypassing checks. + fn import_raw_block(&self, raw_block: RawBlockData) -> Result<(), sp_blockchain::Error>; + /// Clear block gap after initial block insertion. + fn clear_block_gap(&self); + /// Update block gap to specific value. + fn update_block_gap(&self, start: NumberFor, end: NumberFor); +} + impl RpcHandlers { /// Starts an RPC query. /// diff --git a/substrate/primitives/blockchain/src/backend.rs b/substrate/primitives/blockchain/src/backend.rs index 8208f9128e71..0f8c2fe5b597 100644 --- a/substrate/primitives/blockchain/src/backend.rs +++ b/substrate/primitives/blockchain/src/backend.rs @@ -255,6 +255,10 @@ pub trait Backend: } fn block_indexed_body(&self, hash: Block::Hash) -> Result>>>; + + fn clear_block_gap(&self); + + fn update_block_gap(&self, start: NumberFor, end: NumberFor); } /// Blockchain info From 906f52036bf37b81a1089191fdfdee2c92d87807 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Fri, 29 Mar 2024 15:33:41 +0700 Subject: [PATCH 5/5] Logs --- .../consensus/common/src/import_queue.rs | 10 ++++- substrate/client/db/src/lib.rs | 14 +++++-- substrate/client/network/sync/src/engine.rs | 2 + .../network/sync/src/strategy/chain_sync.rs | 41 +++++++++++-------- .../client/network/sync/src/strategy/state.rs | 7 +++- substrate/client/state-db/src/lib.rs | 4 +- 6 files changed, 55 insertions(+), 23 deletions(-) diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 4a1a3479c5da..8307296ed3db 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -28,7 +28,7 @@ //! queues to be instantiated simply. use async_lock::RwLock; -use log::{debug, trace}; +use log::{debug, info, trace}; use std::{ fmt, future::Future, @@ -295,6 +295,7 @@ where Block: BlockT, BI: BlockImport, { + println!("import_single_block"); match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None) .await? { @@ -375,6 +376,9 @@ where { let peer = block.origin; + let number = block.header.clone().map(|h| *h.number()); + info!("*** start verify_single_block_metered: {:?} {:?}", number.clone(), block.hash); + let (header, justifications) = match (block.header, block.justifications) { (Some(header), justifications) => (header, justifications), (None, _) => { @@ -416,6 +420,8 @@ where }, } + println!("*** after check: {:?}", block.hash); + let started = Instant::now(); let mut import_block = BlockImportParams::new(block_origin, header); @@ -458,6 +464,8 @@ where metrics.report_verification(true, verification_time); } + info!("*** finish verify_single_block_metered: {:?} {:?}", number.clone(), block.hash); + Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters { import_block, hash, diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index c93e072f5b91..470369fbacc6 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -2512,18 +2512,26 @@ impl sc_client_api::backend::Backend for Backend { fn pin_block(&self, hash: ::Hash) -> sp_blockchain::Result<()> { let hint = || { let header_metadata = self.blockchain.header_metadata(hash); + println!("pin_block {hash:?}: header_metadata: {:?}", header_metadata); header_metadata .map(|hdr| { - sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref()) - .unwrap_or(None) + let result = sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref()); + match result { + Ok(_) => {println!("sc_state_db::NodeDb::get {hash:?}: result: OK",);} + Err(ref err) => {println!("sc_state_db::NodeDb::get {hash:?}: result: {err:?}");} + }; + + result.unwrap_or(None) .is_some() }) .unwrap_or(false) }; if let Some(number) = self.blockchain.number(hash)? { + println!("self.blockchain.number #{number} {hash:?}"); self.storage.state_db.pin(&hash, number.saturated_into::(), hint).map_err( - |_| { + |err| { + println!("storage.state_db.pin #{number }{hash:?} {err:?}"); sp_blockchain::Error::UnknownBlock(format!( "State already discarded for `{:?}`", hash diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index f758d318f035..eb32a2b53617 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -1230,6 +1230,7 @@ where match Self::encode_state_request(&request) { Ok(data) => { + println!("Preparing state request: {}, peer_id={peer_id}", data.len()); self.network_service.start_request( peer_id, self.state_request_protocol_name.clone(), @@ -1286,6 +1287,7 @@ where } fn decode_state_response(response: &[u8]) -> Result { + println!("decode_state_response: {}", response.len()); let response = StateResponse::decode(response) .map_err(|error| format!("Failed to decode state response: {error}"))?; diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index ea52e6d6d2d5..c14a0521f68f 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -703,6 +703,7 @@ where request: Option>, response: BlockResponse, ) -> Result<(), BadPeer> { + println!("Block response (len) peer_id={peer_id}: {}", response.blocks.len()); self.downloaded_blocks += response.blocks.len(); let mut gap = false; let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(peer_id) { @@ -714,7 +715,8 @@ where self.allowed_requests.add(peer_id); if let Some(request) = request { match &mut peer.state { - PeerSyncState::DownloadingNew(_) => { + PeerSyncState::DownloadingNew(num) => { + println!("PeerSyncState::DownloadingNew peer_id={peer_id}: {}", num); self.blocks.clear_peer_download(peer_id); peer.state = PeerSyncState::Available; if let Some(start_block) = @@ -724,7 +726,8 @@ where } self.ready_blocks() }, - PeerSyncState::DownloadingGap(_) => { + PeerSyncState::DownloadingGap(num) => { + println!("PeerSyncState::DownloadingGap peer_id={peer_id}: {}", num); peer.state = PeerSyncState::Available; if let Some(gap_sync) = &mut self.gap_sync { gap_sync.blocks.clear_peer_download(peer_id); @@ -759,7 +762,7 @@ where } }) .collect(); - debug!( + info!( target: LOG_TARGET, "Drained {} gap blocks from {}", blocks.len(), @@ -767,11 +770,12 @@ where ); blocks } else { - debug!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}"); + info!(target: LOG_TARGET, "Unexpected gap block response from {peer_id}"); return Err(BadPeer(*peer_id, rep::NO_BLOCK)) } }, - PeerSyncState::DownloadingStale(_) => { + PeerSyncState::DownloadingStale(num) => { + println!("PeerSyncState::DownloadingStale peer_id={peer_id}: {}", num); peer.state = PeerSyncState::Available; if blocks.is_empty() { debug!(target: LOG_TARGET, "Empty block response from {peer_id}"); @@ -800,6 +804,7 @@ where .collect() }, PeerSyncState::AncestorSearch { current, start, state } => { + println!("PeerSyncState::AncestorSearch peer_id={peer_id}: {}, {}", current, start); let matching_hash = match (blocks.get(0), self.client.hash(*current)) { (Some(block), Ok(maybe_our_block_hash)) => { trace!( @@ -1034,7 +1039,7 @@ where let median = heads[heads.len() / 2]; if number + STATE_SYNC_FINALITY_THRESHOLD.saturated_into() >= median { if let Ok(Some(header)) = self.client.header(*hash) { - log::debug!( + log::info!( target: LOG_TARGET, "Starting state sync for #{number} ({hash})", ); @@ -1345,7 +1350,7 @@ where warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}"); } self.allowed_requests.set_all(); - debug!( + info!( target: LOG_TARGET, "Restarted with {} ({})", self.best_queued_number, @@ -1358,7 +1363,7 @@ where // should be kept in that state. if let PeerSyncState::DownloadingJustification(_) = p.state { // We make sure our commmon number is at least something we have. - trace!( + info!( target: LOG_TARGET, "Keeping peer {} after restart, updating common number from={} => to={} (our best).", peer_id, @@ -1407,25 +1412,25 @@ where self.import_existing = true; // Latest state is missing, start with the last finalized state or genesis instead. if let Some((hash, number)) = info.finalized_state { - debug!(target: LOG_TARGET, "Starting from finalized state #{number}"); + info!(target: LOG_TARGET, "Starting from finalized state #{number}"); self.best_queued_hash = hash; self.best_queued_number = number; } else { - debug!(target: LOG_TARGET, "Restarting from genesis"); + info!(target: LOG_TARGET, "Restarting from genesis"); self.best_queued_hash = Default::default(); self.best_queued_number = Zero::zero(); } } if let Some((start, end)) = info.block_gap { - debug!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}"); + info!(target: LOG_TARGET, "Starting gap sync #{start} - #{end}"); self.gap_sync = Some(GapSync { best_queued_number: start - One::one(), target: end, blocks: BlockCollection::new(), }); } - trace!( + info!( target: LOG_TARGET, "Restarted sync at #{} ({:?})", self.best_queued_number, @@ -1780,7 +1785,7 @@ where count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, ) { - trace!(target: LOG_TARGET, "Imported {imported} of {count}"); + info!(target: LOG_TARGET, "Imported {imported} of {count}"); let mut has_error = false; for (_, hash) in &results { @@ -1798,13 +1803,15 @@ where has_error |= result.is_err(); match result { - Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => + Ok(BlockImportStatus::ImportedKnown(number, peer_id)) => { + info!("ImportedKnown:{number}"); if let Some(peer) = peer_id { self.update_peer_common_number(&peer, number); - }, + } + }, Ok(BlockImportStatus::ImportedUnknown(number, aux, peer_id)) => { if aux.clear_justification_requests { - trace!( + info!( target: LOG_TARGET, "Block imported clears all pending justification requests {number}: {hash:?}", ); @@ -1812,7 +1819,7 @@ where } if aux.needs_justification { - trace!( + info!( target: LOG_TARGET, "Block imported but requires justification {number}: {hash:?}", ); diff --git a/substrate/client/network/sync/src/strategy/state.rs b/substrate/client/network/sync/src/strategy/state.rs index 8e1450d75470..c88533198cc9 100644 --- a/substrate/client/network/sync/src/strategy/state.rs +++ b/substrate/client/network/sync/src/strategy/state.rs @@ -25,7 +25,7 @@ use crate::{ LOG_TARGET, }; use libp2p::PeerId; -use log::{debug, error, trace}; +use log::{debug, error, info, trace}; use sc_client_api::ProofProvider; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; use sc_network_common::sync::message::BlockAnnounce; @@ -58,6 +58,7 @@ pub enum StateStrategyAction { Finished, } +#[derive(Debug)] enum PeerState { Available, DownloadingState, @@ -69,6 +70,7 @@ impl PeerState { } } +#[derive(Debug)] struct Peer { best_number: NumberFor, state: PeerState, @@ -215,6 +217,7 @@ impl StateStrategy { state: Some(state), }; debug!(target: LOG_TARGET, "State download is complete. Import is queued"); + info!(target: LOG_TARGET, "State download is complete. Import is queued"); // TODO self.actions .push(StateStrategyAction::ImportBlocks { origin, blocks: vec![block] }); Ok(()) @@ -345,6 +348,8 @@ impl StateStrategy { .map(|(peer_id, request)| StateStrategyAction::SendStateRequest { peer_id, request }); self.actions.extend(state_request); + println!("Actions: {}", self.actions.len()); + std::mem::take(&mut self.actions).into_iter() } diff --git a/substrate/client/state-db/src/lib.rs b/substrate/client/state-db/src/lib.rs index 41c231c31aaf..e780db223dbd 100644 --- a/substrate/client/state-db/src/lib.rs +++ b/substrate/client/state-db/src/lib.rs @@ -462,15 +462,17 @@ impl StateDbSync { match self.mode { PruningMode::ArchiveAll => Ok(()), PruningMode::ArchiveCanonical | PruningMode::Constrained(_) => { + let hint_value = hint(); let have_block = self.non_canonical.have_block(hash) || self.pruning.as_ref().map_or_else( - || hint(), + || hint_value, |pruning| match pruning.have_block(hash, number) { HaveBlock::No => false, HaveBlock::Yes => true, HaveBlock::Maybe => hint(), }, ); + // println!("have_block = {have_block} #{number} {hash:?}"); if have_block { let refs = self.pinned.entry(hash.clone()).or_default(); if *refs == 0 {