Skip to content

Commit

Permalink
Add sync strategy restart.
Browse files Browse the repository at this point in the history
  • Loading branch information
shamil-gadelshin committed May 1, 2024
1 parent 88d8bab commit 2447588
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 0 deletions.
7 changes: 7 additions & 0 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,13 @@ where
},
ToServiceCommand::OnBlockFinalized(hash, header) =>
self.strategy.on_block_finalized(&hash, *header.number()),
ToServiceCommand::Restart(sync_restart_args, tx) => {
if let Some(number) = sync_restart_args.new_best_block {
self.strategy.update_common_number_for_peers(number);
}
self.strategy.on_restart(sync_restart_args.sync_mode.into());
let _ = tx.send(());
},
}
}

Expand Down
18 changes: 18 additions & 0 deletions substrate/client/network/sync/src/service/syncing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use sc_network::{NetworkBlock, NetworkSyncForkRequest};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_runtime::traits::{Block as BlockT, NumberFor};

use sc_network_common::sync::SyncMode;
use std::{
pin::Pin,
sync::{
Expand All @@ -34,8 +35,17 @@ use std::{
},
};

/// Arguments for chain-sync restart.
pub struct SyncRestartArgs<B: BlockT> {
/// Updates the common blocks for connected peers when set.
pub new_best_block: Option<NumberFor<B>>,
/// New sync mode for sync strategy restart.
pub sync_mode: SyncMode,
}

/// Commands send to `SyncingEngine`
pub enum ToServiceCommand<B: BlockT> {
Restart(SyncRestartArgs<B>, oneshot::Sender<()>),
SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
RequestJustification(B::Hash, NumberFor<B>),
ClearJustificationRequests,
Expand Down Expand Up @@ -91,6 +101,14 @@ impl<B: BlockT> SyncingService<B> {
rx.await
}

/// Restart the synchronization with new arguments.
pub async fn restart(&self, sync_restart_args: SyncRestartArgs<B>) {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(ToServiceCommand::Restart(sync_restart_args, tx));

let _ = rx.await;
}

/// Get best seen block.
pub async fn best_seen_block(&self) -> Result<Option<NumberFor<B>>, oneshot::Canceled> {
let (tx, rx) = oneshot::channel();
Expand Down
9 changes: 9 additions & 0 deletions substrate/client/network/sync/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ where
new_best
}

/// Restart the chain-sync strategy with the new arguments.
pub fn on_restart(&mut self, sync_mode: SyncMode) {
self.chain_sync.as_mut().map(|s| s.on_restart(chain_sync_mode(sync_mode)));
}

/// Configure an explicit fork sync request in case external code has detected that there is a
/// stale fork missing.
pub fn set_sync_fork_request(
Expand All @@ -305,6 +310,10 @@ where
}
}

pub fn update_common_number_for_peers(&mut self, number: NumberFor<B>) {
self.chain_sync.as_mut().map(|s| s.update_common_number_for_peers(number));
}

/// Request extra justification.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
// Justifications can only be requested via `ChainSync`.
Expand Down
16 changes: 16 additions & 0 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,12 @@ where
.count()
}

/// Restart the chain-sync with the new sync mode.
pub fn on_restart(&mut self, chain_sync_mode: ChainSyncMode) {
self.mode = chain_sync_mode;
self.restart();
}

/// Get the total number of downloaded blocks.
pub fn num_downloaded_blocks(&self) -> usize {
self.downloaded_blocks
Expand Down Expand Up @@ -1292,6 +1298,16 @@ where
}
}

pub fn update_common_number_for_peers(&mut self, new_common: NumberFor<B>) {
for peer in self.peers.values_mut() {
if peer.best_number >= new_common {
peer.update_common_number(new_common);
} else {
peer.update_common_number(peer.best_number);
}
}
}

/// Called when a block has been queued for import.
///
/// Updates our internal state for best queued block and then goes
Expand Down

0 comments on commit 2447588

Please sign in to comment.