From 82ec6cb844efb1b7da2eb81831a396f78c751068 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 2 Oct 2024 17:01:02 +1000 Subject: [PATCH 1/4] Refactor archiver public API to return ArchiveBlockOutcome --- crates/pallet-subspace/src/mock.rs | 1 + crates/sc-consensus-subspace-rpc/src/lib.rs | 43 ++-- crates/sc-consensus-subspace/src/archiver.rs | 64 ++++-- crates/sc-consensus-subspace/src/lib.rs | 15 +- crates/sc-consensus-subspace/src/tests.rs | 3 +- crates/subspace-archiving/src/archiver.rs | 92 ++++----- .../tests/integration/archiver.rs | 186 ++++++++---------- .../tests/integration/piece_reconstruction.rs | 16 +- .../tests/integration/reconstructor.rs | 36 +++- .../benches/auditing.rs | 1 + .../benches/plotting.rs | 1 + .../benches/proving.rs | 1 + .../benches/reading.rs | 1 + crates/subspace-service/src/lib.rs | 9 +- crates/subspace-service/src/rpc.rs | 8 +- test/subspace-test-client/src/lib.rs | 1 + 16 files changed, 271 insertions(+), 207 deletions(-) diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 36e70266fb..ef665d8bc4 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -289,6 +289,7 @@ pub fn create_archived_segment() -> &'static NewArchivedSegment { rand::thread_rng().fill(block.as_mut_slice()); archiver .add_block(block, Default::default(), true) + .archived_segments .into_iter() .next() .unwrap() diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index 45c9cdea97..38312e3bde 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -28,7 +28,8 @@ use jsonrpsee::{Extensions, PendingSubscriptionSink}; use parking_lot::Mutex; use sc_client_api::{AuxStore, BlockBackend}; use sc_consensus_subspace::archiver::{ - recreate_genesis_segment, ArchivedSegmentNotification, SegmentHeadersStore, + recreate_genesis_segment, ArchivedSegmentNotification, ObjectMappingNotification, + SegmentHeadersStore, }; use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::slot_worker::{ @@ -171,15 +172,15 @@ pub trait SubspaceRpcApi { #[method(name = "subspace_lastSegmentHeaders")] async fn last_segment_headers(&self, limit: u32) -> Result>, Error>; - /// Block/transaction archived object mappings subscription + /// Block/transaction object mappings subscription #[subscription( - name = "subspace_subscribeArchivedObjectMappings" => "subspace_archived_object_mappings", - unsubscribe = "subspace_unsubscribeArchivedObjectMappings", + name = "subspace_subscribeObjectMappings" => "subspace_object_mappings", + unsubscribe = "subspace_unsubscribeObjectMappings", item = GlobalObjectMapping, )] - fn subscribe_archived_object_mappings(&self); + fn subscribe_object_mappings(&self); - /// Filtered block/transaction archived object mappings subscription + /// Filtered block/transaction object mappings subscription #[subscription( name = "subspace_subscribeFilteredObjectMappings" => "subspace_filtered_object_mappings", unsubscribe = "subspace_unsubscribeFilteredObjectMappings", @@ -234,6 +235,8 @@ where pub new_slot_notification_stream: SubspaceNotificationStream, /// Reward signing notification stream pub reward_signing_notification_stream: SubspaceNotificationStream, + /// Archived mapping notification stream + pub object_mapping_notification_stream: SubspaceNotificationStream, /// Archived segment notification stream pub archived_segment_notification_stream: SubspaceNotificationStream, @@ -259,6 +262,7 @@ where subscription_executor: SubscriptionTaskExecutor, new_slot_notification_stream: SubspaceNotificationStream, reward_signing_notification_stream: SubspaceNotificationStream, + object_mapping_notification_stream: SubspaceNotificationStream, archived_segment_notification_stream: SubspaceNotificationStream, #[allow(clippy::type_complexity)] solution_response_senders: Arc>>>>, @@ -316,6 +320,7 @@ where subscription_executor: config.subscription_executor, new_slot_notification_stream: config.new_slot_notification_stream, reward_signing_notification_stream: config.reward_signing_notification_stream, + object_mapping_notification_stream: config.object_mapping_notification_stream, archived_segment_notification_stream: config.archived_segment_notification_stream, solution_response_senders: Arc::new(Mutex::new(LruMap::new(ByLength::new( solution_response_senders_capacity, @@ -841,20 +846,14 @@ where // - the number of object mappings in each segment can be very large (hundreds or thousands). // To avoid RPC connection failures, limit the number of mappings returned in each response, // or the number of in-flight responses. - fn subscribe_archived_object_mappings(&self, pending: PendingSubscriptionSink) { + fn subscribe_object_mappings(&self, pending: PendingSubscriptionSink) { // TODO: deny unsafe subscriptions? - // The genesis segment isn't included in this stream. In other methods we recreate is as the first segment, - // but there aren't any mappings in it, so we don't need to recreate it as part of this subscription. - let mapping_stream = self - .archived_segment_notification_stream + .object_mapping_notification_stream .subscribe() - .flat_map(|archived_segment_notification| { - let objects = archived_segment_notification - .archived_segment - .global_object_mappings(); - + .flat_map(|object_mapping_notification| { + let objects = object_mapping_notification.object_mapping; stream::iter(objects) }) .ready_chunks(OBJECT_MAPPING_BATCH_SIZE) @@ -902,14 +901,14 @@ where let hash_count = hashes.len(); // The genesis segment isn't included in this stream, see - // `subscribe_archived_object_mappings` for details. + // `subscribe_object_mappings` for details. let mapping_stream = self - .archived_segment_notification_stream + .object_mapping_notification_stream .subscribe() - .flat_map(move |archived_segment_notification| { - let objects = archived_segment_notification - .archived_segment - .global_object_mappings() + .flat_map(move |object_mapping_notification| { + let objects = object_mapping_notification + .object_mapping + .into_iter() .filter(|object| hashes.remove(&object.hash)) .collect::>(); diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 14286b04d1..4e3eabe3ac 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -76,7 +76,7 @@ use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use std::time::Duration; use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; -use subspace_core_primitives::objects::BlockObjectMapping; +use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject}; use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex}; use subspace_core_primitives::{BlockNumber, PublicKey}; use subspace_erasure_coding::ErasureCoding; @@ -345,6 +345,17 @@ pub struct ArchivedSegmentNotification { pub acknowledgement_sender: TracingUnboundedSender<()>, } +/// Notification with incrementally generated object mappings for a block (and any previous block +/// continuation) +#[derive(Debug, Clone)] +pub struct ObjectMappingNotification { + /// Incremental object mappings for a block (and any previous block continuation). + /// + /// The archived data won't be available in pieces until the entire segment is full and archived. + pub object_mapping: Vec, + // TODO: add an acknowledgement_sender for backpressure if needed +} + fn find_last_archived_block( client: &Client, segment_headers_store: &SegmentHeadersStore, @@ -432,8 +443,11 @@ where let encoded_block = encode_block(signed_block); - let new_archived_segment = Archiver::new(kzg, erasure_coding) - .add_block(encoded_block, block_object_mappings, false) + // There are no mappings in the genesis block, so they can be ignored + let block_outcome = + Archiver::new(kzg, erasure_coding).add_block(encoded_block, block_object_mappings, false); + let new_archived_segment = block_outcome + .archived_segments .into_iter() .next() .expect("Genesis block always results in exactly one archived segment; qed"); @@ -671,9 +685,17 @@ where encoded_block.len() as f32 / 1024.0 ); - let archived_segments = - archiver.add_block(encoded_block, block_object_mappings, false); - let new_segment_headers: Vec = archived_segments + let block_outcome = archiver.add_block(encoded_block, block_object_mappings, false); + // RPC clients only want these mappings in full mapping mode + // TODO: turn this into a command-line argument named `--full-mapping` + if cfg!(feature = "full-archive") { + send_object_mapping_notification( + &subspace_link.object_mapping_notification_sender, + block_outcome.object_mapping, + ); + } + let new_segment_headers: Vec = block_outcome + .archived_segments .iter() .map(|archived_segment| archived_segment.segment_header) .collect(); @@ -753,6 +775,9 @@ fn finalize_block( /// processing, which is necessary for ensuring that when the next block is imported, inherents will /// contain segment header of newly archived block (must happen exactly in the next block). /// +/// When a block with object mappings is produced, notification ([`SubspaceLink::object_mapping_notification_stream`]) +/// will be sent. +/// /// Once segment header is archived, notification ([`SubspaceLink::archived_segment_notification_stream`]) /// will be sent and archiver will be paused until all receivers have provided an acknowledgement /// for it. @@ -811,9 +836,6 @@ where } = archiver; let (mut best_archived_block_hash, mut best_archived_block_number) = best_archived_block; - let archived_segment_notification_sender = - subspace_link.archived_segment_notification_sender.clone(); - while let Some(block_importing_notification) = block_importing_notification_stream.next().await { @@ -842,7 +864,7 @@ where "Checking if block needs to be skipped" ); - // TODO: replace this cfg! with a CLI option + // TODO: turn this into a command-line argument named `--full-mapping` let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive && !cfg!(feature = "full-archive"); if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks { @@ -902,7 +924,8 @@ where &*client, &sync_oracle, telemetry.clone(), - archived_segment_notification_sender.clone(), + subspace_link.object_mapping_notification_sender.clone(), + subspace_link.archived_segment_notification_sender.clone(), best_archived_block_hash, block_number_to_archive, ) @@ -921,6 +944,7 @@ async fn archive_block( client: &Client, sync_oracle: &SubspaceSyncOracle, telemetry: Option, + object_mapping_notification_sender: SubspaceNotificationSender, archived_segment_notification_sender: SubspaceNotificationSender, best_archived_block_hash: Block::Hash, block_number_to_archive: NumberFor, @@ -985,11 +1009,16 @@ where ); let mut new_segment_headers = Vec::new(); - for archived_segment in archiver.add_block( + let block_outcome = archiver.add_block( encoded_block, block_object_mappings, !sync_oracle.is_major_syncing(), - ) { + ); + send_object_mapping_notification( + &object_mapping_notification_sender, + block_outcome.object_mapping, + ); + for archived_segment in block_outcome.archived_segments { let segment_header = archived_segment.segment_header; segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?; @@ -1031,6 +1060,15 @@ where Ok((block_hash_to_archive, block_number_to_archive)) } +fn send_object_mapping_notification( + object_mapping_notification_sender: &SubspaceNotificationSender, + object_mapping: Vec, +) { + let object_mapping_notification = ObjectMappingNotification { object_mapping }; + + object_mapping_notification_sender.notify(move || object_mapping_notification); +} + async fn send_archived_segment_notification( archived_segment_notification_sender: &SubspaceNotificationSender, archived_segment: NewArchivedSegment, diff --git a/crates/sc-consensus-subspace/src/lib.rs b/crates/sc-consensus-subspace/src/lib.rs index 744174c44f..07c7f9d9af 100644 --- a/crates/sc-consensus-subspace/src/lib.rs +++ b/crates/sc-consensus-subspace/src/lib.rs @@ -36,7 +36,7 @@ pub mod slot_worker; mod tests; pub mod verifier; -use crate::archiver::ArchivedSegmentNotification; +use crate::archiver::{ArchivedSegmentNotification, ObjectMappingNotification}; use crate::block_import::BlockImportingNotification; use crate::notification::{SubspaceNotificationSender, SubspaceNotificationStream}; use crate::slot_worker::{NewSlotNotification, RewardSigningNotification}; @@ -52,6 +52,8 @@ pub struct SubspaceLink { new_slot_notification_stream: SubspaceNotificationStream, reward_signing_notification_sender: SubspaceNotificationSender, reward_signing_notification_stream: SubspaceNotificationStream, + object_mapping_notification_sender: SubspaceNotificationSender, + object_mapping_notification_stream: SubspaceNotificationStream, archived_segment_notification_sender: SubspaceNotificationSender, archived_segment_notification_stream: SubspaceNotificationStream, block_importing_notification_sender: @@ -70,6 +72,8 @@ impl SubspaceLink { notification::channel("subspace_new_slot_notification_stream"); let (reward_signing_notification_sender, reward_signing_notification_stream) = notification::channel("subspace_reward_signing_notification_stream"); + let (object_mapping_notification_sender, object_mapping_notification_stream) = + notification::channel("subspace_object_mapping_notification_stream"); let (archived_segment_notification_sender, archived_segment_notification_stream) = notification::channel("subspace_archived_segment_notification_stream"); let (block_importing_notification_sender, block_importing_notification_stream) = @@ -80,6 +84,8 @@ impl SubspaceLink { new_slot_notification_stream, reward_signing_notification_sender, reward_signing_notification_stream, + object_mapping_notification_sender, + object_mapping_notification_stream, archived_segment_notification_sender, archived_segment_notification_stream, block_importing_notification_sender, @@ -103,6 +109,13 @@ impl SubspaceLink { self.reward_signing_notification_stream.clone() } + /// Get stream with notifications about object mappings + pub fn object_mapping_notification_stream( + &self, + ) -> SubspaceNotificationStream { + self.object_mapping_notification_stream.clone() + } + /// Get stream with notifications about archived segment creation pub fn archived_segment_notification_stream( &self, diff --git a/crates/sc-consensus-subspace/src/tests.rs b/crates/sc-consensus-subspace/src/tests.rs index 168d4d07d7..6fc89b9823 100644 --- a/crates/sc-consensus-subspace/src/tests.rs +++ b/crates/sc-consensus-subspace/src/tests.rs @@ -453,7 +453,8 @@ // // let genesis_block = client.block(client.info().genesis_hash).unwrap().unwrap(); // archiver -// .add_block(genesis_block.encode(), BlockObjectMapping::default()) +// .add_block(genesis_block.encode(), BlockObjectMapping::default(), true) +// .archived_segments // .into_iter() // .map(|archived_segment| archived_segment.pieces) // .collect() diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index 3d906c0bb2..9199c4b2c5 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -30,9 +30,7 @@ use parity_scale_codec::{Compact, CompactLen, Decode, Encode, Input, Output}; #[cfg(feature = "parallel")] use rayon::prelude::*; use subspace_core_primitives::hashes::{blake3_254_hash_to_scalar, Blake3Hash}; -use subspace_core_primitives::objects::{ - BlockObject, BlockObjectMapping, GlobalObject, PieceObject, PieceObjectMapping, -}; +use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, GlobalObject}; use subspace_core_primitives::pieces::RawRecord; use subspace_core_primitives::segments::{ ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment, @@ -169,42 +167,26 @@ pub enum SegmentItem { ParentSegmentHeader(SegmentHeader), } -/// Newly archived segment as a combination of segment header hash, segment index and corresponding -/// archived history segment containing pieces +/// Newly archived segment as a combination of segment header and corresponding archived history +/// segment containing pieces #[derive(Debug, Clone, Eq, PartialEq)] pub struct NewArchivedSegment { /// Segment header pub segment_header: SegmentHeader, /// Segment of archived history containing pieces pub pieces: ArchivedHistorySegment, - /// Mappings for objects stored in corresponding pieces. - /// - /// NOTE: Only half (source pieces) will have corresponding mapping item in this `Vec`. - pub object_mapping: Vec, } -impl NewArchivedSegment { - /// Returns all the object mappings in this archived segment as a lazy iterator. - pub fn global_object_mappings(&self) -> impl Iterator + 'static { - // Save memory by only returning the necessary parts of NewArchivedSegment - let object_mapping = self.object_mapping.clone(); - let piece_indexes = self - .segment_header - .segment_index() - .segment_piece_indexes_source_first(); - - // Iterate through the object mapping vector for each piece - object_mapping.into_iter().zip(piece_indexes).flat_map( - move |(piece_mappings, piece_index)| { - // And then through each individual object mapping in the piece - let piece_mappings = piece_mappings.objects().to_vec(); - - piece_mappings - .into_iter() - .map(move |piece_object| GlobalObject::new(piece_index, &piece_object)) - }, - ) - } +/// The outcome of adding a block to the archiver. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ArchiveBlockOutcome { + /// The new segments archived after adding the block. + /// There can be zero or more segments created after each block. + pub archived_segments: Vec, + + /// The new object mappings for those segments. + /// There can be zero or more mappings created after each block. + pub object_mapping: Vec, } /// Archiver instantiation error @@ -355,7 +337,7 @@ impl Archiver { bytes: Vec, object_mapping: BlockObjectMapping, incremental: bool, - ) -> Vec { + ) -> ArchiveBlockOutcome { // Append new block to the buffer self.buffer.push_back(SegmentItem::Block { bytes, @@ -363,12 +345,18 @@ impl Archiver { }); let mut archived_segments = Vec::new(); + let mut object_mapping = Vec::new(); while let Some(segment) = self.produce_segment(incremental) { - archived_segments.push(self.produce_archived_segment(segment)); + let (archived_segment, segment_object_mapping) = self.produce_archived_segment(segment); + archived_segments.push(archived_segment); + object_mapping.extend(segment_object_mapping); } - archived_segments + ArchiveBlockOutcome { + archived_segments, + object_mapping, + } } /// Try to slice buffer contents into segments if there is enough data, producing one segment at @@ -610,12 +598,16 @@ impl Archiver { Some(segment) } - // Take segment as an input, apply necessary transformations and produce archived segment - fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment { + /// Take segment as an input, apply necessary transformations and produce archived segment + fn produce_archived_segment( + &mut self, + segment: Segment, + ) -> (NewArchivedSegment, Vec) { // Create mappings let object_mapping = { - let mut corrected_object_mapping = - vec![PieceObjectMapping::default(); RecordedHistorySegment::NUM_RAW_RECORDS]; + let mut corrected_object_mapping = Vec::new(); + let source_piece_indexes = &self.segment_index.segment_piece_indexes_source_first() + [..RecordedHistorySegment::NUM_RAW_RECORDS]; let Segment::V0 { items } = &segment; // `+1` corresponds to enum variant encoding let mut base_offset_in_segment = 1; @@ -648,14 +640,12 @@ impl Archiver { (offset_in_segment % RawRecord::SIZE).try_into().expect( "Offset within piece should always fit in 32-bit integer; qed", ); - if let Some(piece_object_mapping) = corrected_object_mapping - .get_mut(offset_in_segment / RawRecord::SIZE) - { - piece_object_mapping.objects_mut().push(PieceObject { - hash: block_object.hash, - offset: raw_piece_offset, - }); - } + corrected_object_mapping.push(GlobalObject { + hash: block_object.hash, + piece_index: source_piece_indexes + [offset_in_segment / RawRecord::SIZE], + offset: raw_piece_offset, + }); } } SegmentItem::ParentSegmentHeader(_) => { @@ -833,10 +823,12 @@ impl Archiver { self.buffer .push_front(SegmentItem::ParentSegmentHeader(segment_header)); - NewArchivedSegment { - segment_header, - pieces: pieces.to_shared(), + ( + NewArchivedSegment { + segment_header, + pieces: pieces.to_shared(), + }, object_mapping, - } + ) } } diff --git a/crates/subspace-archiving/tests/integration/archiver.rs b/crates/subspace-archiving/tests/integration/archiver.rs index dbb2dfdb26..06c976d895 100644 --- a/crates/subspace-archiving/tests/integration/archiver.rs +++ b/crates/subspace-archiving/tests/integration/archiver.rs @@ -8,7 +8,7 @@ use std::iter; use std::num::NonZeroUsize; use subspace_archiving::archiver::{Archiver, ArchiverInstantiationError, SegmentItem}; use subspace_core_primitives::hashes::Blake3Hash; -use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, PieceObject}; +use subspace_core_primitives::objects::{BlockObject, BlockObjectMapping, GlobalObject}; use subspace_core_primitives::pieces::{Piece, Record}; use subspace_core_primitives::segments::{ ArchivedBlockProgress, ArchivedHistorySegment, LastArchivedBlock, RecordedHistorySegment, @@ -47,14 +47,14 @@ fn extract_data_from_source_record>(record: &Record, offset: O) -> } #[track_caller] -fn compare_block_objects_to_piece_objects<'a>( +fn compare_block_objects_to_global_objects<'a>( block_objects: impl Iterator, - piece_objects: impl Iterator, + global_objects: impl Iterator, ) { - block_objects.zip(piece_objects).for_each( - |((block, block_object_mapping), (piece, piece_object_mapping))| { + block_objects.zip(global_objects).for_each( + |((block, block_object_mapping), (piece, global_object_mapping))| { assert_eq!( - extract_data_from_source_record(piece.record(), piece_object_mapping.offset), + extract_data_from_source_record(piece.record(), global_object_mapping.offset), extract_data(block, block_object_mapping.offset) ); }, @@ -98,10 +98,10 @@ fn archiver() { (block, object_mapping) }; + let block_0_outcome = archiver.add_block(block_0.clone(), block_0_object_mapping.clone(), true); + let archived_segments = block_0_outcome.archived_segments; // There is not enough data to produce archived segment yet - assert!(archiver - .add_block(block_0.clone(), block_0_object_mapping.clone(), true) - .is_empty()); + assert!(archived_segments.is_empty()); let (block_1, block_1_object_mapping) = { let mut block = vec![0u8; RecordedHistorySegment::SIZE / 3 * 2]; @@ -138,11 +138,12 @@ fn archiver() { (block, object_mapping) }; // This should produce 1 archived segment - let archived_segments = - archiver.add_block(block_1.clone(), block_1_object_mapping.clone(), true); + let block_1_outcome = archiver.add_block(block_1.clone(), block_1_object_mapping.clone(), true); + let archived_segments = block_1_outcome.archived_segments; + let object_mapping = block_1_outcome.object_mapping; assert_eq!(archived_segments.len(), 1); - let first_archived_segment = archived_segments.into_iter().next().unwrap(); + let first_archived_segment = archived_segments.first().cloned().unwrap(); assert_eq!( first_archived_segment.pieces.len(), ArchivedHistorySegment::NUM_PIECES @@ -163,30 +164,22 @@ fn archiver() { assert_eq!(last_archived_block.partial_archived(), Some(65011701)); } - assert_eq!( - first_archived_segment.object_mapping.len(), - RecordedHistorySegment::NUM_RAW_RECORDS - ); // 4 objects fit into the first segment - assert_eq!( - first_archived_segment - .object_mapping - .iter() - .filter(|object_mapping| !object_mapping.objects().is_empty()) - .count(), - 4 - ); + assert_eq!(object_mapping.len(), 4); { let block_objects = iter::repeat(block_0.as_ref()) .zip(block_0_object_mapping.objects()) .chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects())); - let piece_objects = first_archived_segment - .pieces - .source_pieces() - .zip(&first_archived_segment.object_mapping) - .flat_map(|(piece, object_mapping)| iter::repeat(piece).zip(object_mapping.objects())); + let global_objects = object_mapping.into_iter().map(|object_mapping| { + ( + Piece::from( + &first_archived_segment.pieces[object_mapping.piece_index.position() as usize], + ), + object_mapping, + ) + }); - compare_block_objects_to_piece_objects(block_objects, piece_objects); + compare_block_objects_to_global_objects(block_objects, global_objects); } #[cfg(not(feature = "parallel"))] @@ -216,12 +209,13 @@ fn archiver() { block }; // This should be big enough to produce two archived segments in one go - let archived_segments = - archiver.add_block(block_2.clone(), BlockObjectMapping::default(), true); + let block_2_outcome = archiver.add_block(block_2.clone(), BlockObjectMapping::default(), true); + let archived_segments = block_2_outcome.archived_segments.clone(); + let object_mapping = block_2_outcome.object_mapping.clone(); assert_eq!(archived_segments.len(), 2); // Check that initializing archiver with initial state before last block results in the same - // archived segments once last block is added + // archived segments and mappings once last block is added { let mut archiver_with_initial_state = Archiver::with_initial_state( kzg.clone(), @@ -238,46 +232,30 @@ fn archiver() { BlockObjectMapping::default(), true ), - archived_segments, + block_2_outcome, ); } - assert_eq!( - archived_segments[0].object_mapping.len(), - RecordedHistorySegment::NUM_RAW_RECORDS - ); // 1 object fits into the second segment + // There are no objects left for the third segment + assert_eq!(object_mapping.len(), 1); assert_eq!( - archived_segments[0] - .object_mapping - .iter() - .filter(|object_mapping| !object_mapping.objects().is_empty()) - .count(), - 1 - ); - assert_eq!( - archived_segments[1].object_mapping.len(), - RecordedHistorySegment::NUM_RAW_RECORDS - ); - // 0 object fits into the second segment - assert_eq!( - archived_segments[1] - .object_mapping - .iter() - .filter(|object_mapping| !object_mapping.objects().is_empty()) - .count(), - 0 + object_mapping[0].piece_index.segment_index(), + archived_segments[0].segment_header.segment_index(), ); { let block_objects = iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects().iter().skip(2)); - let piece_objects = archived_segments[0] - .pieces - .source_pieces() - .zip(&archived_segments[0].object_mapping) - .flat_map(|(piece, object_mapping)| iter::repeat(piece).zip(object_mapping.objects())); + let global_objects = object_mapping.into_iter().map(|object_mapping| { + ( + Piece::from( + &archived_segments[0].pieces[object_mapping.piece_index.position() as usize], + ), + object_mapping, + ) + }); - compare_block_objects_to_piece_objects(block_objects, piece_objects); + compare_block_objects_to_global_objects(block_objects, global_objects); } // Check archived bytes for block with index `2` in each archived segment @@ -343,12 +321,16 @@ fn archiver() { thread_rng().fill(block.as_mut_slice()); block }; - let archived_segments = - archiver.add_block(block_3.clone(), BlockObjectMapping::default(), true); + let block_3_outcome = archiver.add_block(block_3.clone(), BlockObjectMapping::default(), true); + let archived_segments = block_3_outcome.archived_segments.clone(); + let object_mapping = block_3_outcome.object_mapping.clone(); assert_eq!(archived_segments.len(), 1); + // There are no objects left for the fourth segment + assert_eq!(object_mapping.len(), 0); + // Check that initializing archiver with initial state before last block results in the same - // archived segments once last block is added + // archived segments and mappings once last block is added { let mut archiver_with_initial_state = Archiver::with_initial_state( kzg.clone(), @@ -361,11 +343,11 @@ fn archiver() { assert_eq!( archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true), - archived_segments, + block_3_outcome, ); } - // Archived segment should fit exactly into the last archived segment (rare case) + // Block should fit exactly into the last archived segment (rare case) { let archived_segment = archived_segments.first().unwrap(); let last_archived_block = archived_segment.segment_header.last_archived_block(); @@ -492,6 +474,7 @@ fn one_byte_smaller_segment() { assert_eq!( Archiver::new(kzg.clone(), erasure_coding.clone()) .add_block(vec![0u8; block_size], BlockObjectMapping::default(), true) + .archived_segments .len(), 1 ); @@ -503,6 +486,7 @@ fn one_byte_smaller_segment() { BlockObjectMapping::default(), true ) + .archived_segments .is_empty()); } @@ -532,13 +516,14 @@ fn spill_over_edge_case() { - 3; assert!(archiver .add_block(vec![0u8; block_size], BlockObjectMapping::default(), true) + .archived_segments .is_empty()); // Here we add one more block with internal length that takes 4 bytes in compact length // encoding + one more for enum variant, this should result in new segment being created, but // the very first segment item will not include newly added block because it would result in // subtracting with overflow when trying to slice internal bytes of the segment item - let archived_segments = archiver.add_block( + let block_outcome = archiver.add_block( vec![0u8; RecordedHistorySegment::SIZE], BlockObjectMapping::V0 { objects: vec![BlockObject { @@ -548,23 +533,15 @@ fn spill_over_edge_case() { }, true, ); + let archived_segments = block_outcome.archived_segments; + let object_mapping = block_outcome.object_mapping; assert_eq!(archived_segments.len(), 2); + // If spill over actually happened, we'll not find object mapping in the first segment + assert_eq!(object_mapping.len(), 1); assert_eq!( - archived_segments[0] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 0 - ); - assert_eq!( - archived_segments[1] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 1 + object_mapping[0].piece_index.segment_index(), + archived_segments[1].segment_header.segment_index(), ); } @@ -578,9 +555,13 @@ fn object_on_the_edge_of_segment() { .unwrap(); let mut archiver = Archiver::new(kzg, erasure_coding); let first_block = vec![0u8; RecordedHistorySegment::SIZE]; - let archived_segments = + let block_1_outcome = archiver.add_block(first_block.clone(), BlockObjectMapping::default(), true); + let archived_segments = block_1_outcome.archived_segments; + let object_mapping = block_1_outcome.object_mapping; assert_eq!(archived_segments.len(), 1); + assert_eq!(object_mapping.len(), 0); + let archived_segment = archived_segments.into_iter().next().unwrap(); let left_unarchived_from_first_block = first_block.len() as u32 - archived_segment @@ -632,7 +613,7 @@ fn object_on_the_edge_of_segment() { // First ensure that any smaller offset will get translated into the first archived segment, // this is a protection against code regressions { - let archived_segments = archiver.clone().add_block( + let block_2_outcome = archiver.clone().add_block( second_block.clone(), BlockObjectMapping::V0 { objects: vec![BlockObject { @@ -642,45 +623,34 @@ fn object_on_the_edge_of_segment() { }, true, ); + let archived_segments = block_2_outcome.archived_segments; + let object_mapping = block_2_outcome.object_mapping; assert_eq!(archived_segments.len(), 2); + assert_eq!(object_mapping.len(), 1); assert_eq!( - archived_segments[0] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 1 + object_mapping[0].piece_index.segment_index(), + archived_segments[0].segment_header.segment_index(), ); } - let archived_segments = archiver.add_block( + let block_2_outcome = archiver.add_block( second_block, BlockObjectMapping::V0 { objects: vec![object_mapping], }, true, ); + let archived_segments = block_2_outcome.archived_segments; + let object_mapping = block_2_outcome.object_mapping; assert_eq!(archived_segments.len(), 2); - assert_eq!( - archived_segments[0] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 0 - ); // Object should fall in the next archived segment + assert_eq!(object_mapping.len(), 1); assert_eq!( - archived_segments[1] - .object_mapping - .iter() - .filter(|o| !o.objects().is_empty()) - .count(), - 1 + object_mapping[0].piece_index.segment_index(), + archived_segments[1].segment_header.segment_index(), ); - assert_eq!(archived_segments[1].object_mapping[0].objects().len(), 1); // Ensure bytes are mapped correctly assert_eq!( @@ -689,7 +659,7 @@ fn object_on_the_edge_of_segment() { .to_raw_record_chunks() .flatten() .copied() - .skip(archived_segments[1].object_mapping[0].objects()[0].offset as usize) + .skip(object_mapping[0].offset as usize) .take(mapped_bytes.len()) .collect::>(), mapped_bytes diff --git a/crates/subspace-archiving/tests/integration/piece_reconstruction.rs b/crates/subspace-archiving/tests/integration/piece_reconstruction.rs index 69bc2d7e7e..41175dd743 100644 --- a/crates/subspace-archiving/tests/integration/piece_reconstruction.rs +++ b/crates/subspace-archiving/tests/integration/piece_reconstruction.rs @@ -33,7 +33,9 @@ fn segment_reconstruction_works() { let block = get_random_block(); - let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 1); @@ -79,7 +81,9 @@ fn piece_reconstruction_works() { // Block that fits into the segment fully let block = get_random_block(); - let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 1); @@ -143,7 +147,9 @@ fn segment_reconstruction_fails() { // Block that fits into the segment fully let block = get_random_block(); - let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 1); @@ -184,7 +190,9 @@ fn piece_reconstruction_fails() { // Block that fits into the segment fully let block = get_random_block(); - let archived_segments = archiver.add_block(block, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 1); diff --git a/crates/subspace-archiving/tests/integration/reconstructor.rs b/crates/subspace-archiving/tests/integration/reconstructor.rs index 05af807097..895d2a5145 100644 --- a/crates/subspace-archiving/tests/integration/reconstructor.rs +++ b/crates/subspace-archiving/tests/integration/reconstructor.rs @@ -58,11 +58,28 @@ fn basic() { }; let archived_segments = archiver .add_block(block_0.clone(), BlockObjectMapping::default(), true) + .archived_segments .into_iter() - .chain(archiver.add_block(block_1.clone(), BlockObjectMapping::default(), true)) - .chain(archiver.add_block(block_2.clone(), BlockObjectMapping::default(), true)) - .chain(archiver.add_block(block_3.clone(), BlockObjectMapping::default(), true)) - .chain(archiver.add_block(block_4, BlockObjectMapping::default(), true)) + .chain( + archiver + .add_block(block_1.clone(), BlockObjectMapping::default(), true) + .archived_segments, + ) + .chain( + archiver + .add_block(block_2.clone(), BlockObjectMapping::default(), true) + .archived_segments, + ) + .chain( + archiver + .add_block(block_3.clone(), BlockObjectMapping::default(), true) + .archived_segments, + ) + .chain( + archiver + .add_block(block_4, BlockObjectMapping::default(), true) + .archived_segments, + ) .collect::>(); assert_eq!(archived_segments.len(), 5); @@ -271,8 +288,13 @@ fn partial_data() { }; let archived_segments = archiver .add_block(block_0.clone(), BlockObjectMapping::default(), true) + .archived_segments .into_iter() - .chain(archiver.add_block(block_1, BlockObjectMapping::default(), true)) + .chain( + archiver + .add_block(block_1, BlockObjectMapping::default(), true) + .archived_segments, + ) .collect::>(); assert_eq!(archived_segments.len(), 1); @@ -347,7 +369,9 @@ fn invalid_usage() { block }; - let archived_segments = archiver.add_block(block_0, BlockObjectMapping::default(), true); + let archived_segments = archiver + .add_block(block_0, BlockObjectMapping::default(), true) + .archived_segments; assert_eq!(archived_segments.len(), 4); diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 04eabe50a3..42eb8adb1b 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -64,6 +64,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { Default::default(), true, ) + .archived_segments .into_iter() .next() .unwrap() diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index afcb5ed1ae..b39e8aac28 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -52,6 +52,7 @@ fn criterion_benchmark(c: &mut Criterion) { Default::default(), true, ) + .archived_segments .into_iter() .next() .unwrap() diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index bd719b5805..3d5bb3d6f1 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -72,6 +72,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { Default::default(), true, ) + .archived_segments .into_iter() .next() .unwrap() diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 1826254020..3e2f6b1133 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -63,6 +63,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { Default::default(), true, ) + .archived_segments .into_iter() .next() .unwrap() diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index dc86f7dd6e..da826734a6 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -66,7 +66,8 @@ use sc_consensus::{ }; use sc_consensus_slots::SlotProportion; use sc_consensus_subspace::archiver::{ - create_subspace_archiver, ArchivedSegmentNotification, SegmentHeadersStore, + create_subspace_archiver, ArchivedSegmentNotification, ObjectMappingNotification, + SegmentHeadersStore, }; use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport}; use sc_consensus_subspace::notification::SubspaceNotificationStream; @@ -713,6 +714,8 @@ where /// Stream of notifications about blocks about to be imported. pub block_importing_notification_stream: SubspaceNotificationStream>, + /// Archived object mapping stream. + pub object_mapping_notification_stream: SubspaceNotificationStream, /// Archived segment stream. pub archived_segment_notification_stream: SubspaceNotificationStream, @@ -1179,6 +1182,7 @@ where let new_slot_notification_stream = subspace_link.new_slot_notification_stream(); let reward_signing_notification_stream = subspace_link.reward_signing_notification_stream(); let block_importing_notification_stream = subspace_link.block_importing_notification_stream(); + let object_mapping_notification_stream = subspace_link.object_mapping_notification_stream(); let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream(); let (pot_source_worker, pot_gossip_worker, pot_slot_info_stream) = PotSourceWorker::new( @@ -1290,6 +1294,7 @@ where let client = client.clone(); let new_slot_notification_stream = new_slot_notification_stream.clone(); let reward_signing_notification_stream = reward_signing_notification_stream.clone(); + let object_mapping_notification_stream = object_mapping_notification_stream.clone(); let archived_segment_notification_stream = archived_segment_notification_stream.clone(); let transaction_pool = transaction_pool.clone(); let backend = backend.clone(); @@ -1301,6 +1306,7 @@ where subscription_executor, new_slot_notification_stream: new_slot_notification_stream.clone(), reward_signing_notification_stream: reward_signing_notification_stream.clone(), + object_mapping_notification_stream: object_mapping_notification_stream.clone(), archived_segment_notification_stream: archived_segment_notification_stream .clone(), dsn_bootstrap_nodes: dsn_bootstrap_nodes.clone(), @@ -1337,6 +1343,7 @@ where new_slot_notification_stream, reward_signing_notification_stream, block_importing_notification_stream, + object_mapping_notification_stream, archived_segment_notification_stream, network_starter, transaction_pool, diff --git a/crates/subspace-service/src/rpc.rs b/crates/subspace-service/src/rpc.rs index e5b17a3edd..9b10d80649 100644 --- a/crates/subspace-service/src/rpc.rs +++ b/crates/subspace-service/src/rpc.rs @@ -26,7 +26,9 @@ use jsonrpsee::RpcModule; use mmr_rpc::{Mmr, MmrApiServer}; use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer}; use sc_client_api::{AuxStore, BlockBackend}; -use sc_consensus_subspace::archiver::{ArchivedSegmentNotification, SegmentHeadersStore}; +use sc_consensus_subspace::archiver::{ + ArchivedSegmentNotification, ObjectMappingNotification, SegmentHeadersStore, +}; use sc_consensus_subspace::notification::SubspaceNotificationStream; use sc_consensus_subspace::slot_worker::{ NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle, @@ -65,6 +67,8 @@ where /// A stream with notifications about headers that need to be signed with ability to send /// signature back. pub reward_signing_notification_stream: SubspaceNotificationStream, + /// A stream with notifications about mappings. + pub object_mapping_notification_stream: SubspaceNotificationStream, /// A stream with notifications about archived segment creation. pub archived_segment_notification_stream: SubspaceNotificationStream, @@ -113,6 +117,7 @@ where subscription_executor, new_slot_notification_stream, reward_signing_notification_stream, + object_mapping_notification_stream, archived_segment_notification_stream, dsn_bootstrap_nodes, segment_headers_store, @@ -131,6 +136,7 @@ where subscription_executor, new_slot_notification_stream, reward_signing_notification_stream, + object_mapping_notification_stream, archived_segment_notification_stream, dsn_bootstrap_nodes, segment_headers_store, diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index edd8d49529..cc5cb1c1f4 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -217,6 +217,7 @@ where BlockObjectMapping::default(), true, ) + .archived_segments .into_iter() .next() .expect("First block is always producing one segment; qed"); From b3acac052d07be2753a0beb02dec37ac071f4cf2 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 9 Oct 2024 13:15:45 +1000 Subject: [PATCH 2/4] Remove unnecessary indentation --- crates/subspace-archiving/src/archiver.rs | 98 +++++++++++------------ 1 file changed, 47 insertions(+), 51 deletions(-) diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index 9199c4b2c5..c9c0e656c9 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -604,59 +604,55 @@ impl Archiver { segment: Segment, ) -> (NewArchivedSegment, Vec) { // Create mappings - let object_mapping = { - let mut corrected_object_mapping = Vec::new(); - let source_piece_indexes = &self.segment_index.segment_piece_indexes_source_first() - [..RecordedHistorySegment::NUM_RAW_RECORDS]; - let Segment::V0 { items } = &segment; - // `+1` corresponds to enum variant encoding - let mut base_offset_in_segment = 1; - for segment_item in items { - match segment_item { - SegmentItem::Padding => { - unreachable!( - "Segment during archiving never contains SegmentItem::Padding; qed" - ); - } - SegmentItem::Block { - bytes, - object_mapping, - } - | SegmentItem::BlockStart { - bytes, - object_mapping, - } - | SegmentItem::BlockContinuation { - bytes, - object_mapping, - } => { - for block_object in object_mapping.objects() { - // `+1` corresponds to `SegmentItem::X {}` enum variant encoding - let offset_in_segment = base_offset_in_segment - + 1 - + Compact::compact_len(&(bytes.len() as u32)) - + block_object.offset as usize; - let raw_piece_offset = - (offset_in_segment % RawRecord::SIZE).try_into().expect( - "Offset within piece should always fit in 32-bit integer; qed", - ); - corrected_object_mapping.push(GlobalObject { - hash: block_object.hash, - piece_index: source_piece_indexes - [offset_in_segment / RawRecord::SIZE], - offset: raw_piece_offset, - }); - } - } - SegmentItem::ParentSegmentHeader(_) => { - // Ignore, no objects mappings here + let mut corrected_object_mapping = Vec::new(); + let source_piece_indexes = &self.segment_index.segment_piece_indexes_source_first() + [..RecordedHistorySegment::NUM_RAW_RECORDS]; + + let Segment::V0 { items } = &segment; + // `+1` corresponds to enum variant encoding + let mut base_offset_in_segment = 1; + for segment_item in items { + match segment_item { + SegmentItem::Padding => { + unreachable!( + "Segment during archiving never contains SegmentItem::Padding; qed" + ); + } + SegmentItem::Block { + bytes, + object_mapping, + } + | SegmentItem::BlockStart { + bytes, + object_mapping, + } + | SegmentItem::BlockContinuation { + bytes, + object_mapping, + } => { + for block_object in object_mapping.objects() { + // `+1` corresponds to `SegmentItem::X {}` enum variant encoding + let offset_in_segment = base_offset_in_segment + + 1 + + Compact::compact_len(&(bytes.len() as u32)) + + block_object.offset as usize; + let raw_piece_offset = (offset_in_segment % RawRecord::SIZE) + .try_into() + .expect("Offset within piece should always fit in 32-bit integer; qed"); + corrected_object_mapping.push(GlobalObject { + hash: block_object.hash, + piece_index: source_piece_indexes[offset_in_segment / RawRecord::SIZE], + offset: raw_piece_offset, + }); } } - - base_offset_in_segment += segment_item.encoded_size(); + SegmentItem::ParentSegmentHeader(_) => { + // Ignore, no objects mappings here + } } - corrected_object_mapping - }; + + base_offset_in_segment += segment_item.encoded_size(); + } let mut pieces = { // Serialize segment into concatenation of raw records @@ -828,7 +824,7 @@ impl Archiver { segment_header, pieces: pieces.to_shared(), }, - object_mapping, + corrected_object_mapping, ) } } From b53673d4ad95a082c489897729314807d048fcad Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 9 Oct 2024 13:28:53 +1000 Subject: [PATCH 3/4] Remove unused PieceObject types --- .../subspace-core-primitives/src/objects.rs | 74 +------------------ 1 file changed, 2 insertions(+), 72 deletions(-) diff --git a/crates/subspace-core-primitives/src/objects.rs b/crates/subspace-core-primitives/src/objects.rs index e78f75650c..54b312f83b 100644 --- a/crates/subspace-core-primitives/src/objects.rs +++ b/crates/subspace-core-primitives/src/objects.rs @@ -15,10 +15,9 @@ //! Data structures related to objects (useful data) stored on Subspace Network. //! -//! Mappings provided are of 3 kinds: +//! There are two kinds of mappings: //! * for objects within a block -//! * for objects within a piece -//! * for global objects in the global history of the blockchain +//! * for global objects in the global history of the blockchain (inside a piece) #[cfg(not(feature = "std"))] extern crate alloc; @@ -91,64 +90,6 @@ impl BlockObjectMapping { } } -/// Object stored inside of the piece -#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))] -pub struct PieceObject { - /// Object hash - #[cfg_attr(feature = "serde", serde(with = "hex"))] - pub hash: Blake3Hash, - /// Raw record offset of the object in that piece, for use with `Record::to_raw_record_bytes` - pub offset: u32, -} - -/// Mapping of objects stored inside of the piece -#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))] -#[cfg_attr(feature = "serde", serde(rename_all_fields = "camelCase"))] -pub enum PieceObjectMapping { - /// V0 of object mapping data structure - #[codec(index = 0)] - V0 { - /// Objects stored inside of the piece - objects: Vec, - }, -} - -impl Default for PieceObjectMapping { - fn default() -> Self { - Self::V0 { - objects: Vec::new(), - } - } -} - -impl PieceObjectMapping { - /// Returns a newly created PieceObjectMapping from a list of object mappings - #[inline] - pub fn from_objects(objects: impl IntoIterator) -> Self { - Self::V0 { - objects: objects.into_iter().collect(), - } - } - - /// Returns the object mappings as a read-only slice - pub fn objects(&self) -> &[PieceObject] { - match self { - Self::V0 { objects, .. } => objects, - } - } - - /// Returns the object mappings as a mutable slice - pub fn objects_mut(&mut self) -> &mut Vec { - match self { - Self::V0 { objects, .. } => objects, - } - } -} - /// Object stored in the history of the blockchain #[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] @@ -182,17 +123,6 @@ impl From for CompactGlobalObject { } } -impl GlobalObject { - /// Returns a newly created GlobalObject from a piece index and object. - pub fn new(piece_index: PieceIndex, piece_object: &PieceObject) -> Self { - Self { - hash: piece_object.hash, - piece_index, - offset: piece_object.offset, - } - } -} - /// Space-saving serialization of an object stored in the history of the blockchain #[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Encode, Decode, TypeInfo)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] From 411a0785859087cb48cbafb64ec0eb5d5415191b Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 9 Oct 2024 13:35:07 +1000 Subject: [PATCH 4/4] Split out Archiver::produce_object_mappings() --- crates/subspace-archiving/src/archiver.rs | 34 +++++++++++------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index c9c0e656c9..fcde0a2489 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -348,9 +348,8 @@ impl Archiver { let mut object_mapping = Vec::new(); while let Some(segment) = self.produce_segment(incremental) { - let (archived_segment, segment_object_mapping) = self.produce_archived_segment(segment); - archived_segments.push(archived_segment); - object_mapping.extend(segment_object_mapping); + object_mapping.extend(self.produce_object_mappings(&segment)); + archived_segments.push(self.produce_archived_segment(segment)); } ArchiveBlockOutcome { @@ -598,17 +597,14 @@ impl Archiver { Some(segment) } - /// Take segment as an input, apply necessary transformations and produce archived segment - fn produce_archived_segment( - &mut self, - segment: Segment, - ) -> (NewArchivedSegment, Vec) { - // Create mappings - let mut corrected_object_mapping = Vec::new(); + /// Take segment as an input, apply necessary transformations and produce archived object mappings. + /// Must be called before `produce_archived_segment()`. + fn produce_object_mappings(&self, segment: &Segment) -> Vec { let source_piece_indexes = &self.segment_index.segment_piece_indexes_source_first() [..RecordedHistorySegment::NUM_RAW_RECORDS]; - let Segment::V0 { items } = &segment; + + let mut corrected_object_mapping = Vec::new(); // `+1` corresponds to enum variant encoding let mut base_offset_in_segment = 1; for segment_item in items { @@ -654,6 +650,11 @@ impl Archiver { base_offset_in_segment += segment_item.encoded_size(); } + corrected_object_mapping + } + + /// Take segment as an input, apply necessary transformations and produce archived segment + fn produce_archived_segment(&mut self, segment: Segment) -> NewArchivedSegment { let mut pieces = { // Serialize segment into concatenation of raw records let mut raw_record_shards = Vec::::with_capacity(RecordedHistorySegment::SIZE); @@ -819,12 +820,9 @@ impl Archiver { self.buffer .push_front(SegmentItem::ParentSegmentHeader(segment_header)); - ( - NewArchivedSegment { - segment_header, - pieces: pieces.to_shared(), - }, - corrected_object_mapping, - ) + NewArchivedSegment { + segment_header, + pieces: pieces.to_shared(), + } } }