Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fast-sync. #2657

Closed
wants to merge 9 commits into from
17 changes: 14 additions & 3 deletions crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config;
use sc_proof_of_time::source::PotSourceWorker;
use sc_proof_of_time::verifier::PotVerifier;
use sc_service::error::Error as ServiceError;
use sc_service::{Configuration, NetworkStarter, SpawnTasksParams, TaskManager};
use sc_service::{ClientExt, Configuration, NetworkStarter, SpawnTasksParams, TaskManager};
use sc_subspace_block_relay::{
build_consensus_relay, BlockRelayConfigurationError, NetworkWrapper,
};
Expand Down Expand Up @@ -700,6 +700,12 @@ where
mut telemetry,
} = other;

// Clear block gap on reruns
if client.info().finalized_state.is_some() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Add fast-sync flag condition.

info!(client_info=?client.info(), "Client info");
client.clear_block_gap();
}

let offchain_indexing_enabled = config.offchain_worker.indexing_enabled;
let (node, bootstrap_nodes) = match config.subspace_networking {
SubspaceNetworking::Reuse {
Expand Down Expand Up @@ -795,7 +801,8 @@ where
}
};

let import_queue_service = import_queue.service();
let import_queue_service1 = import_queue.service();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Remove the copy by using Arc

let import_queue_service2 = import_queue.service();
let network_wrapper = Arc::new(NetworkWrapper::default());
let block_relay = Some(
build_consensus_relay(
Expand Down Expand Up @@ -879,6 +886,7 @@ where
);

network_wrapper.set(network_service.clone());

if config.sync_from_dsn {
let dsn_sync_piece_getter = config.dsn_piece_getter.unwrap_or_else(|| {
Arc::new(PieceProvider::new(
Expand All @@ -901,10 +909,13 @@ where
Arc::clone(&network_service),
node.clone(),
Arc::clone(&client),
import_queue_service,
import_queue_service1,
import_queue_service2,
sync_target_block_number,
pause_sync,
dsn_sync_piece_getter,
subspace_link.clone(),
config.fast_sync_enabled,
);
task_manager
.spawn_handle()
Expand Down
127 changes: 106 additions & 21 deletions crates/subspace-service/src/sync_from_dsn.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
mod import_blocks;
pub(super) mod piece_validator;
mod segment_header_downloader;
pub(crate) mod fast_sync;
pub(crate) mod import_blocks;
pub(crate) mod piece_validator;
pub(crate) mod segment_header_downloader;

use crate::sync_from_dsn::import_blocks::import_blocks_from_dsn;
pub use crate::sync_from_dsn::import_blocks::DsnSyncPieceGetter;
use crate::sync_from_dsn::fast_sync::FastSyncer;
use crate::sync_from_dsn::import_blocks::{import_blocks_from_dsn, DsnSyncPieceGetter};
use crate::sync_from_dsn::segment_header_downloader::SegmentHeaderDownloader;
use futures::channel::mpsc;
use futures::{select, FutureExt, StreamExt};
use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents};
use sc_client_api::{AuxStore, BlockBackend, BlockchainEvents, ProofProvider};
use sc_consensus::import_queue::ImportQueueService;
use sc_consensus_subspace::archiver::SegmentHeadersStore;
use sc_consensus_subspace::SubspaceLink;
use sc_network::{NetworkPeers, NetworkService};
use sc_service::ClientExt;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus_subspace::{FarmerPublicKey, SubspaceApi};
use sp_objects::ObjectsApi;
use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor};
use sp_runtime::Saturating;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use subspace_archiving::reconstructor::Reconstructor;
use subspace_core_primitives::SegmentIndex;
use subspace_networking::Node;
use tracing::{info, warn};
use tokio::sync::Mutex;
use tracing::{error, info, warn};

/// How much time to wait for new block to be imported before timing out and starting sync from DSN
const NO_IMPORTED_BLOCKS_TIMEOUT: Duration = Duration::from_secs(10 * 60);
Expand All @@ -45,15 +51,18 @@ enum NotificationReason {
/// Create node observer that will track node state and send notifications to worker to start sync
/// from DSN.
#[allow(clippy::too_many_arguments)]
pub(super) fn create_observer_and_worker<Block, AS, Client, PG>(
pub(crate) fn create_observer_and_worker<Block, AS, Client, PG, IQS>(
segment_headers_store: SegmentHeadersStore<AS>,
network_service: Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
node: Node,
client: Arc<Client>,
mut import_queue_service: Box<dyn ImportQueueService<Block>>,
import_queue_service1: Box<IQS>,
import_queue_service2: Box<IQS>,
sync_target_block_number: Arc<AtomicU32>,
pause_sync: Arc<AtomicBool>,
piece_getter: PG,
subspace_link: SubspaceLink<Block>,
fast_sync_enabled: bool,
) -> (
impl Future<Output = ()> + Send + 'static,
impl Future<Output = Result<(), sc_service::Error>> + Send + 'static,
Expand All @@ -65,29 +74,47 @@ where
+ BlockBackend<Block>
+ BlockchainEvents<Block>
+ ProvideRuntimeApi<Block>
+ ProofProvider<Block>
+ Send
+ Sync
+ ClientExt<Block>
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey>,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
PG: DsnSyncPieceGetter + Send + Sync + 'static,
IQS: ImportQueueService<Block> + ?Sized + 'static,
{
let network_service_clone = network_service.clone();
let (tx, rx) = mpsc::channel(0);
let notification_sender = tx.clone();
let observer_fut = {
let node = node.clone();
let client = Arc::clone(&client);

async move { create_observer(network_service.as_ref(), &node, client.as_ref(), tx).await }
async move {
create_observer(
network_service_clone.as_ref(),
&node,
client.as_ref(),
notification_sender,
)
.await
}
};
let worker_fut = async move {
create_worker(
segment_headers_store,
&node,
client.as_ref(),
import_queue_service.as_mut(),
client.clone(),
import_queue_service1,
import_queue_service2,
sync_target_block_number,
pause_sync,
rx,
&piece_getter,
network_service,
tx,
subspace_link,
fast_sync_enabled,
)
.await
};
Expand Down Expand Up @@ -213,24 +240,31 @@ async fn create_substrate_network_observer<Block>(
async fn create_worker<Block, AS, IQS, Client, PG>(
segment_headers_store: SegmentHeadersStore<AS>,
node: &Node,
client: &Client,
import_queue_service: &mut IQS,
client: Arc<Client>,
import_queue_service: Box<IQS>,
mut import_queue_service2: Box<IQS>,
sync_target_block_number: Arc<AtomicU32>,
pause_sync: Arc<AtomicBool>,
mut notifications: mpsc::Receiver<NotificationReason>,
piece_getter: &PG,
network_service: Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
mut notifications_sender: mpsc::Sender<NotificationReason>,
subspace_link: SubspaceLink<Block>,
fast_sync_enabled: bool,
) -> Result<(), sc_service::Error>
where
Block: BlockT,
AS: AuxStore + Send + Sync + 'static,
Client: HeaderBackend<Block>
+ BlockBackend<Block>
+ ClientExt<Block>
+ ProvideRuntimeApi<Block>
+ ProofProvider<Block>
+ Send
+ Sync
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey>,
IQS: ImportQueueService<Block> + ?Sized,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
IQS: ImportQueueService<Block> + ?Sized + 'static,
PG: DsnSyncPieceGetter,
{
let info = client.info();
Expand All @@ -249,19 +283,68 @@ where
.saturating_sub(chain_constants.confirmation_depth_k().into());
let segment_header_downloader = SegmentHeaderDownloader::new(node);

while let Some(reason) = notifications.next().await {
pause_sync.store(true, Ordering::Release);
let mut reconstructor = Reconstructor::new().map_err(|error| error.to_string())?;

pause_sync.store(true, Ordering::Release);

let finalized_state_detected = client.info().finalized_state.is_some();

// TODO: Introduce fast-sync support for reruns
if fast_sync_enabled && finalized_state_detected {
info!("Fast sync detected existing finalized state.")
}

// Run fast-sync first.
#[allow(clippy::collapsible_if)] // clippy error: the if statement is not the same
if fast_sync_enabled && !finalized_state_detected {
if notifications.next().await.is_some() {
let import_queue_service = Arc::new(Mutex::new(import_queue_service));
let fast_syncer = FastSyncer::new(
&segment_headers_store,
node,
piece_getter,
client.clone(),
import_queue_service,
network_service.clone(),
subspace_link,
);

let fast_sync_result = fast_syncer.sync().await;

match fast_sync_result {
Ok(fast_sync_result) => {
last_processed_block_number = fast_sync_result.last_imported_block_number;
last_processed_segment_index = fast_sync_result.last_imported_segment_index;
reconstructor = fast_sync_result.reconstructor;
}
Err(err) => {
error!("Fast sync failed: {err}");
panic!("Fast sync failed."); // TODO:
}
}

info!(%last_processed_block_number, %last_processed_segment_index, "Fast sync finished."); // TODO: change to debug

notifications_sender
.try_send(NotificationReason::WentOnlineSubspace)
.map_err(|_| {
sc_service::Error::Other("Can't send sync notification reason.".into())
})?;
}
}

while let Some(reason) = notifications.next().await {
info!(?reason, "Received notification to sync from DSN");
// TODO: Maybe handle failed block imports, additional helpful logging
let import_froms_from_dsn_fut = import_blocks_from_dsn(
&segment_headers_store,
&segment_header_downloader,
client,
client.as_ref(),
piece_getter,
import_queue_service,
import_queue_service2.as_mut(),
&mut last_processed_segment_index,
&mut last_processed_block_number,
&mut reconstructor,
);
let wait_almost_synced_fut = async {
loop {
Expand Down Expand Up @@ -294,6 +377,8 @@ where
}
}

info!("Finished DSN sync."); // TODO: change to debug

pause_sync.store(false, Ordering::Release);

while notifications.try_next().is_ok() {
Expand Down
Loading