diff --git a/substrate/client/api/src/in_mem.rs b/substrate/client/api/src/in_mem.rs index b933ed1f17e0..0238343f250f 100644 --- a/substrate/client/api/src/in_mem.rs +++ b/substrate/client/api/src/in_mem.rs @@ -447,6 +447,10 @@ impl blockchain::Backend for Blockchain { ) -> sp_blockchain::Result>>> { unimplemented!("Not supported by the in-mem backend.") } + + fn clear_block_gap(&self) { + unimplemented!("Not supported by the in-mem backend.") + } } impl backend::AuxStore for Blockchain { diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 0faa90dfc4f9..6a2a0975ed60 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -803,6 +803,11 @@ impl sc_client_api::blockchain::Backend for BlockchainDb HeaderMetadata for BlockchainDb { @@ -1774,6 +1779,7 @@ impl Backend { for m in meta_updates { self.blockchain.update_meta(m); } + self.blockchain.update_block_gap(block_gap); Ok(()) diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 47e23337633b..0ac8ca1ecd27 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -743,6 +743,21 @@ impl NetworkService { rx.await.map_err(|_| ()) } + /// Returns a collection of currently connected (open) peers. + pub async fn connected_peers(&self) -> Result, ()> { + let (tx, rx) = oneshot::channel(); + + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::ConnectedPeers { pending_response: tx }); + + match rx.await { + Ok(v) => Ok(v), + // The channel can only be closed if the network worker no longer exists. + Err(_) => Err(()), + } + } + /// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates. /// /// Returns an `Err` if one of the given addresses is invalid or contains an @@ -1173,6 +1188,9 @@ enum ServiceToWorkerMsg { NetworkState { pending_response: oneshot::Sender>, }, + ConnectedPeers { + pending_response: oneshot::Sender>, + }, DisconnectPeer(PeerId, ProtocolName), } @@ -1315,9 +1333,21 @@ where .behaviour_mut() .user_protocol_mut() .disconnect_peer(&who, protocol_name), + ServiceToWorkerMsg::ConnectedPeers { pending_response } => { + let _ = pending_response.send(self.connected_peers()); + }, } } + fn connected_peers(&self) -> Vec { + self.network_service + .behaviour() + .user_protocol() + .open_peers() + .cloned() + .collect::>() + } + /// Process the next event coming from `Swarm`. fn handle_swarm_event(&mut self, event: SwarmEvent>>) { match event { diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 294dcf3a9703..45506d8d67f5 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -916,6 +916,13 @@ where }, ToServiceCommand::OnBlockFinalized(hash, header) => self.strategy.on_block_finalized(&hash, *header.number()), + ToServiceCommand::Restart(sync_restart_args, tx) => { + if let Some(number) = sync_restart_args.new_best_block { + self.strategy.update_common_number_for_peers(number); + } + self.strategy.on_restart(sync_restart_args.sync_mode.into()); + let _ = tx.send(()); + }, } } diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 9f6c0f45d089..68bb5dabcac6 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -25,9 +25,9 @@ pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusPro mod block_announce_validator; mod futures_stream; mod justification_requests; -mod pending_responses; +pub mod pending_responses; mod request_metrics; -mod schema; +pub mod schema; pub mod types; pub mod block_relay_protocol; diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index 602c69df7ff9..3d51328092ac 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -40,7 +40,7 @@ type ResponseResult = Result, ProtocolName), RequestFailure>, on type ResponseFuture = BoxFuture<'static, ResponseResult>; /// An event we receive once a pending response future resolves. -pub(crate) struct ResponseEvent { +pub struct ResponseEvent { pub peer_id: PeerId, pub key: StrategyKey, pub request: PeerRequest, @@ -48,7 +48,7 @@ pub(crate) struct ResponseEvent { } /// Stream taking care of polling pending responses. -pub(crate) struct PendingResponses { +pub struct PendingResponses { /// Pending responses pending_responses: StreamMap<(PeerId, StrategyKey), BoxStream<'static, (PeerRequest, ResponseResult)>>, diff --git a/substrate/client/network/sync/src/schema.rs b/substrate/client/network/sync/src/schema.rs index 22b7ee592778..f995e5c991a1 100644 --- a/substrate/client/network/sync/src/schema.rs +++ b/substrate/client/network/sync/src/schema.rs @@ -18,6 +18,6 @@ //! Include sources generated from protobuf definitions. -pub(crate) mod v1 { +pub mod v1 { include!(concat!(env!("OUT_DIR"), "/api.v1.rs")); } diff --git a/substrate/client/network/sync/src/service/syncing_service.rs b/substrate/client/network/sync/src/service/syncing_service.rs index 92d649d65dc3..a142c07a6376 100644 --- a/substrate/client/network/sync/src/service/syncing_service.rs +++ b/substrate/client/network/sync/src/service/syncing_service.rs @@ -26,6 +26,7 @@ use sc_network::{NetworkBlock, NetworkSyncForkRequest}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use sp_runtime::traits::{Block as BlockT, NumberFor}; +use sc_network_common::sync::SyncMode; use std::{ pin::Pin, sync::{ @@ -34,8 +35,17 @@ use std::{ }, }; +/// Arguments for chain-sync restart. +pub struct SyncRestartArgs { + /// Updates the common blocks for connected peers when set. + pub new_best_block: Option>, + /// New sync mode for sync strategy restart. + pub sync_mode: SyncMode, +} + /// Commands send to `SyncingEngine` pub enum ToServiceCommand { + Restart(SyncRestartArgs, oneshot::Sender<()>), SetSyncForkRequest(Vec, B::Hash, NumberFor), RequestJustification(B::Hash, NumberFor), ClearJustificationRequests, @@ -91,6 +101,14 @@ impl SyncingService { rx.await } + /// Restart the synchronization with new arguments. + pub async fn restart(&self, sync_restart_args: SyncRestartArgs) { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send(ToServiceCommand::Restart(sync_restart_args, tx)); + + let _ = rx.await; + } + /// Get best seen block. pub async fn best_seen_block(&self) -> Result>, oneshot::Canceled> { let (tx, rx) = oneshot::channel(); diff --git a/substrate/client/network/sync/src/state_request_handler.rs b/substrate/client/network/sync/src/state_request_handler.rs index 6bd2389fb5d1..cfdb642043f1 100644 --- a/substrate/client/network/sync/src/state_request_handler.rs +++ b/substrate/client/network/sync/src/state_request_handler.rs @@ -53,7 +53,7 @@ mod rep { } /// Generates a [`ProtocolConfig`] for the state request protocol, refusing incoming requests. -pub fn generate_protocol_config>( +fn generate_protocol_config>( protocol_id: &ProtocolId, genesis_hash: Hash, fork_id: Option<&str>, @@ -70,7 +70,10 @@ pub fn generate_protocol_config>( } /// Generate the state protocol name from the genesis hash and fork id. -fn generate_protocol_name>(genesis_hash: Hash, fork_id: Option<&str>) -> String { +pub fn generate_protocol_name>( + genesis_hash: Hash, + fork_id: Option<&str>, +) -> String { let genesis_hash = genesis_hash.as_ref(); if let Some(fork_id) = fork_id { format!("/{}/{}/state/2", array_bytes::bytes2hex("", genesis_hash), fork_id) diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index 16fbfc5300bb..1397fba74803 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -20,7 +20,7 @@ //! and specific syncing algorithms. pub mod chain_sync; -mod state; +pub mod state; pub mod state_sync; pub mod warp; @@ -291,6 +291,11 @@ where new_best } + /// Restart the chain-sync strategy with the new arguments. + pub fn on_restart(&mut self, sync_mode: SyncMode) { + self.chain_sync.as_mut().map(|s| s.on_restart(chain_sync_mode(sync_mode))); + } + /// Configure an explicit fork sync request in case external code has detected that there is a /// stale fork missing. pub fn set_sync_fork_request( @@ -305,6 +310,10 @@ where } } + pub fn update_common_number_for_peers(&mut self, number: NumberFor) { + self.chain_sync.as_mut().map(|s| s.update_common_number_for_peers(number)); + } + /// Request extra justification. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { // Justifications can only be requested via `ChainSync`. diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 6e349267805b..99788c4bfc10 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -473,6 +473,12 @@ where .count() } + /// Restart the chain-sync with the new sync mode. + pub fn on_restart(&mut self, chain_sync_mode: ChainSyncMode) { + self.mode = chain_sync_mode; + self.restart(); + } + /// Get the total number of downloaded blocks. pub fn num_downloaded_blocks(&self) -> usize { self.downloaded_blocks @@ -1292,6 +1298,16 @@ where } } + pub fn update_common_number_for_peers(&mut self, new_common: NumberFor) { + for peer in self.peers.values_mut() { + if peer.best_number >= new_common { + peer.update_common_number(new_common); + } else { + peer.update_common_number(peer.best_number); + } + } + } + /// Called when a block has been queued for import. /// /// Updates our internal state for best queued block and then goes diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 71f5a39917f1..0ff858e7d941 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -118,6 +118,34 @@ where _phantom: PhantomData, } +impl crate::ClientExt for Client +where + B: backend::Backend, + E: CallExecutor + Send + Sync, + Block: BlockT, + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: CoreApi + ApiExt, + RA: Sync + Send, +{ + /// Apply a checked and validated block to an operation. + fn apply_block( + &self, + operation: &mut ClientImportOperation, + import_block: BlockImportParams, + storage_changes: Option>, + ) -> sp_blockchain::Result + where + Self: ProvideRuntimeApi, + >::Api: CoreApi + ApiExt, + { + self.apply_block(operation, import_block, storage_changes) + } + + fn clear_block_gap(&self) { + self.backend.blockchain().clear_block_gap(); + } +} + /// Used in importing a block, where additional changes are made after the runtime /// executed. enum PrePostHeader { diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 9480d4a0b072..6957cbbdb138 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -40,7 +40,10 @@ use codec::{Decode, Encode}; use futures::{pin_mut, FutureExt, StreamExt}; use jsonrpsee::RpcModule; use log::{debug, error, warn}; -use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider}; +use sc_client_api::{ + backend, blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ClientImportOperation, + ProofProvider, +}; use sc_network::{ config::MultiaddrWithPeerId, NetworkBlock, NetworkPeers, NetworkStateInfo, PeerId, }; @@ -75,6 +78,7 @@ pub use sc_chain_spec::{ Properties, RuntimeGenesis, }; +use sc_consensus::BlockImportParams; pub use sc_consensus::ImportQueue; pub use sc_executor::NativeExecutionDispatch; pub use sc_network_sync::WarpSyncParams; @@ -96,6 +100,19 @@ const DEFAULT_PROTOCOL_ID: &str = "sup"; #[derive(Clone)] pub struct RpcHandlers(Arc>); +/// Provides extended functions for `Client` to enable fast-sync. +pub trait ClientExt> { + /// Apply a checked and validated block to an operation. + fn apply_block( + &self, + operation: &mut ClientImportOperation, + import_block: BlockImportParams, + storage_changes: Option>, + ) -> sp_blockchain::Result; + /// Clear block gap after initial block insertion. + fn clear_block_gap(&self); +} + impl RpcHandlers { /// Starts an RPC query. /// diff --git a/substrate/primitives/blockchain/src/backend.rs b/substrate/primitives/blockchain/src/backend.rs index 7a09865f858d..363c71fdb75a 100644 --- a/substrate/primitives/blockchain/src/backend.rs +++ b/substrate/primitives/blockchain/src/backend.rs @@ -255,6 +255,9 @@ pub trait Backend: } fn block_indexed_body(&self, hash: Block::Hash) -> Result>>>; + + /// Clears the block gap from DB after the fast-sync. + fn clear_block_gap(&self); } /// Blockchain info