Skip to content

Commit

Permalink
Merge pull request #16 from subspace/subspace-fast-sync-new4
Browse files Browse the repository at this point in the history
Support fast-sync algorithm.
  • Loading branch information
shamil-gadelshin authored May 9, 2024
2 parents 480a5c3 + d16ad68 commit 8082697
Show file tree
Hide file tree
Showing 14 changed files with 150 additions and 9 deletions.
4 changes: 4 additions & 0 deletions substrate/client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ impl<Block: BlockT> blockchain::Backend<Block> for Blockchain<Block> {
) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
unimplemented!("Not supported by the in-mem backend.")
}

fn clear_block_gap(&self) {
unimplemented!("Not supported by the in-mem backend.")
}
}

impl<Block: BlockT> backend::AuxStore for Blockchain<Block> {
Expand Down
6 changes: 6 additions & 0 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,11 @@ impl<Block: BlockT> sc_client_api::blockchain::Backend<Block> for BlockchainDb<B
Err(sp_blockchain::Error::Backend(format!("Error decoding body list: {}", err))),
}
}

fn clear_block_gap(&self) {
debug!(target: "sync", "Clear block gap.");
self.update_block_gap(None);
}
}

impl<Block: BlockT> HeaderMetadata<Block> for BlockchainDb<Block> {
Expand Down Expand Up @@ -1774,6 +1779,7 @@ impl<Block: BlockT> Backend<Block> {
for m in meta_updates {
self.blockchain.update_meta(m);
}

self.blockchain.update_block_gap(block_gap);

Ok(())
Expand Down
30 changes: 30 additions & 0 deletions substrate/client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,21 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
rx.await.map_err(|_| ())
}

/// Returns a collection of currently connected (open) peers.
pub async fn connected_peers(&self) -> Result<Vec<PeerId>, ()> {
let (tx, rx) = oneshot::channel();

let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::ConnectedPeers { pending_response: tx });

match rx.await {
Ok(v) => Ok(v),
// The channel can only be closed if the network worker no longer exists.
Err(_) => Err(()),
}
}

/// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates.
///
/// Returns an `Err` if one of the given addresses is invalid or contains an
Expand Down Expand Up @@ -1173,6 +1188,9 @@ enum ServiceToWorkerMsg {
NetworkState {
pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
},
ConnectedPeers {
pending_response: oneshot::Sender<Vec<PeerId>>,
},
DisconnectPeer(PeerId, ProtocolName),
}

Expand Down Expand Up @@ -1315,9 +1333,21 @@ where
.behaviour_mut()
.user_protocol_mut()
.disconnect_peer(&who, protocol_name),
ServiceToWorkerMsg::ConnectedPeers { pending_response } => {
let _ = pending_response.send(self.connected_peers());
},
}
}

fn connected_peers(&self) -> Vec<PeerId> {
self.network_service
.behaviour()
.user_protocol()
.open_peers()
.cloned()
.collect::<Vec<_>>()
}

/// Process the next event coming from `Swarm`.
fn handle_swarm_event(&mut self, event: SwarmEvent<BehaviourOut, THandlerErr<Behaviour<B>>>) {
match event {
Expand Down
7 changes: 7 additions & 0 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,13 @@ where
},
ToServiceCommand::OnBlockFinalized(hash, header) =>
self.strategy.on_block_finalized(&hash, *header.number()),
ToServiceCommand::Restart(sync_restart_args, tx) => {
if let Some(number) = sync_restart_args.new_best_block {
self.strategy.update_common_number_for_peers(number);
}
self.strategy.on_restart(sync_restart_args.sync_mode.into());
let _ = tx.send(());
},
}
}

Expand Down
4 changes: 2 additions & 2 deletions substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusPro
mod block_announce_validator;
mod futures_stream;
mod justification_requests;
mod pending_responses;
pub mod pending_responses;
mod request_metrics;
mod schema;
pub mod schema;
pub mod types;

pub mod block_relay_protocol;
Expand Down
4 changes: 2 additions & 2 deletions substrate/client/network/sync/src/pending_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ type ResponseResult = Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, on
type ResponseFuture = BoxFuture<'static, ResponseResult>;

/// An event we receive once a pending response future resolves.
pub(crate) struct ResponseEvent<B: BlockT> {
pub struct ResponseEvent<B: BlockT> {
pub peer_id: PeerId,
pub key: StrategyKey,
pub request: PeerRequest<B>,
pub response: ResponseResult,
}

/// Stream taking care of polling pending responses.
pub(crate) struct PendingResponses<B: BlockT> {
pub struct PendingResponses<B: BlockT> {
/// Pending responses
pending_responses:
StreamMap<(PeerId, StrategyKey), BoxStream<'static, (PeerRequest<B>, ResponseResult)>>,
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

//! Include sources generated from protobuf definitions.
pub(crate) mod v1 {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/api.v1.rs"));
}
18 changes: 18 additions & 0 deletions substrate/client/network/sync/src/service/syncing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use sc_network::{NetworkBlock, NetworkSyncForkRequest};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_runtime::traits::{Block as BlockT, NumberFor};

use sc_network_common::sync::SyncMode;
use std::{
pin::Pin,
sync::{
Expand All @@ -34,8 +35,17 @@ use std::{
},
};

/// Arguments for chain-sync restart.
pub struct SyncRestartArgs<B: BlockT> {
/// Updates the common blocks for connected peers when set.
pub new_best_block: Option<NumberFor<B>>,
/// New sync mode for sync strategy restart.
pub sync_mode: SyncMode,
}

/// Commands send to `SyncingEngine`
pub enum ToServiceCommand<B: BlockT> {
Restart(SyncRestartArgs<B>, oneshot::Sender<()>),
SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
RequestJustification(B::Hash, NumberFor<B>),
ClearJustificationRequests,
Expand Down Expand Up @@ -91,6 +101,14 @@ impl<B: BlockT> SyncingService<B> {
rx.await
}

/// Restart the synchronization with new arguments.
pub async fn restart(&self, sync_restart_args: SyncRestartArgs<B>) {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::Restart(sync_restart_args, tx));

let _ = rx.await;
}

/// Get best seen block.
pub async fn best_seen_block(&self) -> Result<Option<NumberFor<B>>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
Expand Down
7 changes: 5 additions & 2 deletions substrate/client/network/sync/src/state_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod rep {
}

/// Generates a [`ProtocolConfig`] for the state request protocol, refusing incoming requests.
pub fn generate_protocol_config<Hash: AsRef<[u8]>>(
fn generate_protocol_config<Hash: AsRef<[u8]>>(
protocol_id: &ProtocolId,
genesis_hash: Hash,
fork_id: Option<&str>,
Expand All @@ -70,7 +70,10 @@ pub fn generate_protocol_config<Hash: AsRef<[u8]>>(
}

/// Generate the state protocol name from the genesis hash and fork id.
fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
pub fn generate_protocol_name<Hash: AsRef<[u8]>>(
genesis_hash: Hash,
fork_id: Option<&str>,
) -> String {
let genesis_hash = genesis_hash.as_ref();
if let Some(fork_id) = fork_id {
format!("/{}/{}/state/2", array_bytes::bytes2hex("", genesis_hash), fork_id)
Expand Down
11 changes: 10 additions & 1 deletion substrate/client/network/sync/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! and specific syncing algorithms.
pub mod chain_sync;
mod state;
pub mod state;
pub mod state_sync;
pub mod warp;

Expand Down Expand Up @@ -291,6 +291,11 @@ where
new_best
}

/// Restart the chain-sync strategy with the new arguments.
pub fn on_restart(&mut self, sync_mode: SyncMode) {
self.chain_sync.as_mut().map(|s| s.on_restart(chain_sync_mode(sync_mode)));
}

/// Configure an explicit fork sync request in case external code has detected that there is a
/// stale fork missing.
pub fn set_sync_fork_request(
Expand All @@ -305,6 +310,10 @@ where
}
}

pub fn update_common_number_for_peers(&mut self, number: NumberFor<B>) {
self.chain_sync.as_mut().map(|s| s.update_common_number_for_peers(number));
}

/// Request extra justification.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
// Justifications can only be requested via `ChainSync`.
Expand Down
16 changes: 16 additions & 0 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,12 @@ where
.count()
}

/// Restart the chain-sync with the new sync mode.
pub fn on_restart(&mut self, chain_sync_mode: ChainSyncMode) {
self.mode = chain_sync_mode;
self.restart();
}

/// Get the total number of downloaded blocks.
pub fn num_downloaded_blocks(&self) -> usize {
self.downloaded_blocks
Expand Down Expand Up @@ -1292,6 +1298,16 @@ where
}
}

pub fn update_common_number_for_peers(&mut self, new_common: NumberFor<B>) {
for peer in self.peers.values_mut() {
if peer.best_number >= new_common {
peer.update_common_number(new_common);
} else {
peer.update_common_number(peer.best_number);
}
}
}

/// Called when a block has been queued for import.
///
/// Updates our internal state for best queued block and then goes
Expand Down
28 changes: 28 additions & 0 deletions substrate/client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,34 @@ where
_phantom: PhantomData<RA>,
}

impl<B, E, Block, RA> crate::ClientExt<Block, B> for Client<B, E, Block, RA>
where
B: backend::Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
Block: BlockT,
Client<B, E, Block, RA>: ProvideRuntimeApi<Block>,
<Client<B, E, Block, RA> as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> + ApiExt<Block>,
RA: Sync + Send,
{
/// Apply a checked and validated block to an operation.
fn apply_block(
&self,
operation: &mut ClientImportOperation<Block, B>,
import_block: BlockImportParams<Block>,
storage_changes: Option<sc_consensus::StorageChanges<Block>>,
) -> sp_blockchain::Result<ImportResult>
where
Self: ProvideRuntimeApi<Block>,
<Self as ProvideRuntimeApi<Block>>::Api: CoreApi<Block> + ApiExt<Block>,
{
self.apply_block(operation, import_block, storage_changes)
}

fn clear_block_gap(&self) {
self.backend.blockchain().clear_block_gap();
}
}

/// Used in importing a block, where additional changes are made after the runtime
/// executed.
enum PrePostHeader<H> {
Expand Down
19 changes: 18 additions & 1 deletion substrate/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ use codec::{Decode, Encode};
use futures::{pin_mut, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
use log::{debug, error, warn};
use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
use sc_client_api::{
backend, blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ClientImportOperation,
ProofProvider,
};
use sc_network::{
config::MultiaddrWithPeerId, NetworkBlock, NetworkPeers, NetworkStateInfo, PeerId,
};
Expand Down Expand Up @@ -75,6 +78,7 @@ pub use sc_chain_spec::{
Properties, RuntimeGenesis,
};

use sc_consensus::BlockImportParams;
pub use sc_consensus::ImportQueue;
pub use sc_executor::NativeExecutionDispatch;
pub use sc_network_sync::WarpSyncParams;
Expand All @@ -96,6 +100,19 @@ const DEFAULT_PROTOCOL_ID: &str = "sup";
#[derive(Clone)]
pub struct RpcHandlers(Arc<RpcModule<()>>);

/// Provides extended functions for `Client` to enable fast-sync.
pub trait ClientExt<Block: BlockT, B: backend::Backend<Block>> {
/// Apply a checked and validated block to an operation.
fn apply_block(
&self,
operation: &mut ClientImportOperation<Block, B>,
import_block: BlockImportParams<Block>,
storage_changes: Option<sc_consensus::StorageChanges<Block>>,
) -> sp_blockchain::Result<sc_consensus::ImportResult>;
/// Clear block gap after initial block insertion.
fn clear_block_gap(&self);
}

impl RpcHandlers {
/// Starts an RPC query.
///
Expand Down
3 changes: 3 additions & 0 deletions substrate/primitives/blockchain/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ pub trait Backend<Block: BlockT>:
}

fn block_indexed_body(&self, hash: Block::Hash) -> Result<Option<Vec<Vec<u8>>>>;

/// Clears the block gap from DB after the fast-sync.
fn clear_block_gap(&self);
}

/// Blockchain info
Expand Down

0 comments on commit 8082697

Please sign in to comment.