Skip to content

Commit

Permalink
Merge pull request #3108 from autonomys/archiver-public-api
Browse files Browse the repository at this point in the history
Refactor archiver public API to return ArchiveBlockOutcome
  • Loading branch information
teor2345 authored Oct 9, 2024
2 parents bda8801 + 411a078 commit ac9e171
Show file tree
Hide file tree
Showing 17 changed files with 305 additions and 317 deletions.
1 change: 1 addition & 0 deletions crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ pub fn create_archived_segment() -> &'static NewArchivedSegment {
rand::thread_rng().fill(block.as_mut_slice());
archiver
.add_block(block, Default::default(), true)
.archived_segments
.into_iter()
.next()
.unwrap()
Expand Down
43 changes: 21 additions & 22 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use jsonrpsee::{Extensions, PendingSubscriptionSink};
use parking_lot::Mutex;
use sc_client_api::{AuxStore, BlockBackend};
use sc_consensus_subspace::archiver::{
recreate_genesis_segment, ArchivedSegmentNotification, SegmentHeadersStore,
recreate_genesis_segment, ArchivedSegmentNotification, ObjectMappingNotification,
SegmentHeadersStore,
};
use sc_consensus_subspace::notification::SubspaceNotificationStream;
use sc_consensus_subspace::slot_worker::{
Expand Down Expand Up @@ -171,15 +172,15 @@ pub trait SubspaceRpcApi {
#[method(name = "subspace_lastSegmentHeaders")]
async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>;

/// Block/transaction archived object mappings subscription
/// Block/transaction object mappings subscription
#[subscription(
name = "subspace_subscribeArchivedObjectMappings" => "subspace_archived_object_mappings",
unsubscribe = "subspace_unsubscribeArchivedObjectMappings",
name = "subspace_subscribeObjectMappings" => "subspace_object_mappings",
unsubscribe = "subspace_unsubscribeObjectMappings",
item = GlobalObjectMapping,
)]
fn subscribe_archived_object_mappings(&self);
fn subscribe_object_mappings(&self);

/// Filtered block/transaction archived object mappings subscription
/// Filtered block/transaction object mappings subscription
#[subscription(
name = "subspace_subscribeFilteredObjectMappings" => "subspace_filtered_object_mappings",
unsubscribe = "subspace_unsubscribeFilteredObjectMappings",
Expand Down Expand Up @@ -234,6 +235,8 @@ where
pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
/// Reward signing notification stream
pub reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
/// Archived mapping notification stream
pub object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
/// Archived segment notification stream
pub archived_segment_notification_stream:
SubspaceNotificationStream<ArchivedSegmentNotification>,
Expand All @@ -259,6 +262,7 @@ where
subscription_executor: SubscriptionTaskExecutor,
new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
archived_segment_notification_stream: SubspaceNotificationStream<ArchivedSegmentNotification>,
#[allow(clippy::type_complexity)]
solution_response_senders: Arc<Mutex<LruMap<SlotNumber, mpsc::Sender<Solution<PublicKey>>>>>,
Expand Down Expand Up @@ -316,6 +320,7 @@ where
subscription_executor: config.subscription_executor,
new_slot_notification_stream: config.new_slot_notification_stream,
reward_signing_notification_stream: config.reward_signing_notification_stream,
object_mapping_notification_stream: config.object_mapping_notification_stream,
archived_segment_notification_stream: config.archived_segment_notification_stream,
solution_response_senders: Arc::new(Mutex::new(LruMap::new(ByLength::new(
solution_response_senders_capacity,
Expand Down Expand Up @@ -841,20 +846,14 @@ where
// - the number of object mappings in each segment can be very large (hundreds or thousands).
// To avoid RPC connection failures, limit the number of mappings returned in each response,
// or the number of in-flight responses.
fn subscribe_archived_object_mappings(&self, pending: PendingSubscriptionSink) {
fn subscribe_object_mappings(&self, pending: PendingSubscriptionSink) {
// TODO: deny unsafe subscriptions?

// The genesis segment isn't included in this stream. In other methods we recreate is as the first segment,
// but there aren't any mappings in it, so we don't need to recreate it as part of this subscription.

let mapping_stream = self
.archived_segment_notification_stream
.object_mapping_notification_stream
.subscribe()
.flat_map(|archived_segment_notification| {
let objects = archived_segment_notification
.archived_segment
.global_object_mappings();

.flat_map(|object_mapping_notification| {
let objects = object_mapping_notification.object_mapping;
stream::iter(objects)
})
.ready_chunks(OBJECT_MAPPING_BATCH_SIZE)
Expand Down Expand Up @@ -902,14 +901,14 @@ where
let hash_count = hashes.len();

// The genesis segment isn't included in this stream, see
// `subscribe_archived_object_mappings` for details.
// `subscribe_object_mappings` for details.
let mapping_stream = self
.archived_segment_notification_stream
.object_mapping_notification_stream
.subscribe()
.flat_map(move |archived_segment_notification| {
let objects = archived_segment_notification
.archived_segment
.global_object_mappings()
.flat_map(move |object_mapping_notification| {
let objects = object_mapping_notification
.object_mapping
.into_iter()
.filter(|object| hashes.remove(&object.hash))
.collect::<Vec<_>>();

Expand Down
64 changes: 51 additions & 13 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::archiver::{Archiver, NewArchivedSegment};
use subspace_core_primitives::objects::BlockObjectMapping;
use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject};
use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex};
use subspace_core_primitives::{BlockNumber, PublicKey};
use subspace_erasure_coding::ErasureCoding;
Expand Down Expand Up @@ -345,6 +345,17 @@ pub struct ArchivedSegmentNotification {
pub acknowledgement_sender: TracingUnboundedSender<()>,
}

/// Notification with incrementally generated object mappings for a block (and any previous block
/// continuation)
#[derive(Debug, Clone)]
pub struct ObjectMappingNotification {
/// Incremental object mappings for a block (and any previous block continuation).
///
/// The archived data won't be available in pieces until the entire segment is full and archived.
pub object_mapping: Vec<GlobalObject>,
// TODO: add an acknowledgement_sender for backpressure if needed
}

fn find_last_archived_block<Block, Client, AS>(
client: &Client,
segment_headers_store: &SegmentHeadersStore<AS>,
Expand Down Expand Up @@ -432,8 +443,11 @@ where

let encoded_block = encode_block(signed_block);

let new_archived_segment = Archiver::new(kzg, erasure_coding)
.add_block(encoded_block, block_object_mappings, false)
// There are no mappings in the genesis block, so they can be ignored
let block_outcome =
Archiver::new(kzg, erasure_coding).add_block(encoded_block, block_object_mappings, false);
let new_archived_segment = block_outcome
.archived_segments
.into_iter()
.next()
.expect("Genesis block always results in exactly one archived segment; qed");
Expand Down Expand Up @@ -671,9 +685,17 @@ where
encoded_block.len() as f32 / 1024.0
);

let archived_segments =
archiver.add_block(encoded_block, block_object_mappings, false);
let new_segment_headers: Vec<SegmentHeader> = archived_segments
let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false);
// RPC clients only want these mappings in full mapping mode
// TODO: turn this into a command-line argument named `--full-mapping`
if cfg!(feature = "full-archive") {
send_object_mapping_notification(
&subspace_link.object_mapping_notification_sender,
block_outcome.object_mapping,
);
}
let new_segment_headers: Vec<SegmentHeader> = block_outcome
.archived_segments
.iter()
.map(|archived_segment| archived_segment.segment_header)
.collect();
Expand Down Expand Up @@ -753,6 +775,9 @@ fn finalize_block<Block, Backend, Client>(
/// processing, which is necessary for ensuring that when the next block is imported, inherents will
/// contain segment header of newly archived block (must happen exactly in the next block).
///
/// When a block with object mappings is produced, notification ([`SubspaceLink::object_mapping_notification_stream`])
/// will be sent.
///
/// Once segment header is archived, notification ([`SubspaceLink::archived_segment_notification_stream`])
/// will be sent and archiver will be paused until all receivers have provided an acknowledgement
/// for it.
Expand Down Expand Up @@ -811,9 +836,6 @@ where
} = archiver;
let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block;

let archived_segment_notification_sender =
subspace_link.archived_segment_notification_sender.clone();

while let Some(block_importing_notification) =
block_importing_notification_stream.next().await
{
Expand Down Expand Up @@ -842,7 +864,7 @@ where
"Checking if block needs to be skipped"
);

// TODO: replace this cfg! with a CLI option
// TODO: turn this into a command-line argument named `--full-mapping`
let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive
&& !cfg!(feature = "full-archive");
if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks {
Expand Down Expand Up @@ -902,7 +924,8 @@ where
&*client,
&sync_oracle,
telemetry.clone(),
archived_segment_notification_sender.clone(),
subspace_link.object_mapping_notification_sender.clone(),
subspace_link.archived_segment_notification_sender.clone(),
best_archived_block_hash,
block_number_to_archive,
)
Expand All @@ -921,6 +944,7 @@ async fn archive_block<Block, Backend, Client, AS, SO>(
client: &Client,
sync_oracle: &SubspaceSyncOracle<SO>,
telemetry: Option<TelemetryHandle>,
object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
best_archived_block_hash: Block::Hash,
block_number_to_archive: NumberFor<Block>,
Expand Down Expand Up @@ -985,11 +1009,16 @@ where
);

let mut new_segment_headers = Vec::new();
for archived_segment in archiver.add_block(
let block_outcome = archiver.add_block(
encoded_block,
block_object_mappings,
!sync_oracle.is_major_syncing(),
) {
);
send_object_mapping_notification(
&object_mapping_notification_sender,
block_outcome.object_mapping,
);
for archived_segment in block_outcome.archived_segments {
let segment_header = archived_segment.segment_header;

segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?;
Expand Down Expand Up @@ -1031,6 +1060,15 @@ where
Ok((block_hash_to_archive, block_number_to_archive))
}

fn send_object_mapping_notification(
object_mapping_notification_sender: &SubspaceNotificationSender<ObjectMappingNotification>,
object_mapping: Vec<GlobalObject>,
) {
let object_mapping_notification = ObjectMappingNotification { object_mapping };

object_mapping_notification_sender.notify(move || object_mapping_notification);
}

async fn send_archived_segment_notification(
archived_segment_notification_sender: &SubspaceNotificationSender<ArchivedSegmentNotification>,
archived_segment: NewArchivedSegment,
Expand Down
15 changes: 14 additions & 1 deletion crates/sc-consensus-subspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub mod slot_worker;
mod tests;
pub mod verifier;

use crate::archiver::ArchivedSegmentNotification;
use crate::archiver::{ArchivedSegmentNotification, ObjectMappingNotification};
use crate::block_import::BlockImportingNotification;
use crate::notification::{SubspaceNotificationSender, SubspaceNotificationStream};
use crate::slot_worker::{NewSlotNotification, RewardSigningNotification};
Expand All @@ -52,6 +52,8 @@ pub struct SubspaceLink<Block: BlockT> {
new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
reward_signing_notification_sender: SubspaceNotificationSender<RewardSigningNotification>,
reward_signing_notification_stream: SubspaceNotificationStream<RewardSigningNotification>,
object_mapping_notification_sender: SubspaceNotificationSender<ObjectMappingNotification>,
object_mapping_notification_stream: SubspaceNotificationStream<ObjectMappingNotification>,
archived_segment_notification_sender: SubspaceNotificationSender<ArchivedSegmentNotification>,
archived_segment_notification_stream: SubspaceNotificationStream<ArchivedSegmentNotification>,
block_importing_notification_sender:
Expand All @@ -70,6 +72,8 @@ impl<Block: BlockT> SubspaceLink<Block> {
notification::channel("subspace_new_slot_notification_stream");
let (reward_signing_notification_sender, reward_signing_notification_stream) =
notification::channel("subspace_reward_signing_notification_stream");
let (object_mapping_notification_sender, object_mapping_notification_stream) =
notification::channel("subspace_object_mapping_notification_stream");
let (archived_segment_notification_sender, archived_segment_notification_stream) =
notification::channel("subspace_archived_segment_notification_stream");
let (block_importing_notification_sender, block_importing_notification_stream) =
Expand All @@ -80,6 +84,8 @@ impl<Block: BlockT> SubspaceLink<Block> {
new_slot_notification_stream,
reward_signing_notification_sender,
reward_signing_notification_stream,
object_mapping_notification_sender,
object_mapping_notification_stream,
archived_segment_notification_sender,
archived_segment_notification_stream,
block_importing_notification_sender,
Expand All @@ -103,6 +109,13 @@ impl<Block: BlockT> SubspaceLink<Block> {
self.reward_signing_notification_stream.clone()
}

/// Get stream with notifications about object mappings
pub fn object_mapping_notification_stream(
&self,
) -> SubspaceNotificationStream<ObjectMappingNotification> {
self.object_mapping_notification_stream.clone()
}

/// Get stream with notifications about archived segment creation
pub fn archived_segment_notification_stream(
&self,
Expand Down
3 changes: 2 additions & 1 deletion crates/sc-consensus-subspace/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@
//
// let genesis_block = client.block(client.info().genesis_hash).unwrap().unwrap();
// archiver
// .add_block(genesis_block.encode(), BlockObjectMapping::default())
// .add_block(genesis_block.encode(), BlockObjectMapping::default(), true)
// .archived_segments
// .into_iter()
// .map(|archived_segment| archived_segment.pieces)
// .collect()
Expand Down
Loading

0 comments on commit ac9e171

Please sign in to comment.