diff --git a/crates/subspace-archiving/src/archiver.rs b/crates/subspace-archiving/src/archiver.rs index fcde0a2489..0ce001b46b 100644 --- a/crates/subspace-archiving/src/archiver.rs +++ b/crates/subspace-archiving/src/archiver.rs @@ -61,6 +61,12 @@ pub enum Segment { }, } +impl Default for Segment { + fn default() -> Self { + Segment::V0 { items: Vec::new() } + } +} + impl Encode for Segment { fn size_hint(&self) -> usize { RecordedHistorySegment::SIZE @@ -124,6 +130,24 @@ impl Segment { let Self::V0 { items } = self; items.push(segment_item); } + + pub fn items(&self) -> &[SegmentItem] { + match self { + Segment::V0 { items } => items, + } + } + + pub(crate) fn items_mut(&mut self) -> &mut Vec { + match self { + Segment::V0 { items } => items, + } + } + + pub fn into_items(self) -> Vec { + match self { + Segment::V0 { items } => items, + } + } } /// Kinds of items that are contained within a segment @@ -260,7 +284,7 @@ impl Archiver { /// Create a new instance of the archiver with initial state in case of restart. /// - /// `block` corresponds to `last_archived_block` and will be processed accordingly to its state. + /// `block` corresponds to `last_archived_block` and will be processed according to its state. pub fn with_initial_state( kzg: Kzg, erasure_coding: ErasureCoding, @@ -297,7 +321,7 @@ impl Archiver { } Ordering::Greater => { // Take part of the encoded block that wasn't archived yet and push to the - // buffer and block continuation + // buffer as a block continuation object_mapping .objects_mut() .retain_mut(|block_object: &mut BlockObject| { @@ -328,7 +352,8 @@ impl Archiver { } } - /// Adds new block to internal buffer, potentially producing pieces and segment header headers. + /// Adds new block to internal buffer, potentially producing pieces, segment headers, and + /// object mappings. /// /// Incremental archiving can be enabled if amortized block addition cost is preferred over /// throughput. @@ -347,11 +372,19 @@ impl Archiver { let mut archived_segments = Vec::new(); let mut object_mapping = Vec::new(); - while let Some(segment) = self.produce_segment(incremental) { - object_mapping.extend(self.produce_object_mappings(&segment)); + // Add completed segments and their mappings for this block. + while let Some(mut segment) = self.produce_segment(incremental) { + // Produce any segment mappings that haven't already been produced. + object_mapping.extend(Self::produce_object_mappings( + self.segment_index, + segment.items_mut().iter_mut(), + )); archived_segments.push(self.produce_archived_segment(segment)); } + // Produce any next segment buffer mappings that haven't already been produced. + object_mapping.extend(self.produce_next_segment_mappings()); + ArchiveBlockOutcome { archived_segments, object_mapping, @@ -387,9 +420,8 @@ impl Archiver { ); } - let Segment::V0 { items } = segment; // Push all of the items back into the buffer, we don't have enough data yet - for segment_item in items.into_iter().rev() { + for segment_item in segment.into_items().into_iter().rev() { self.buffer.push_front(segment_item); } @@ -478,7 +510,7 @@ impl Archiver { .unwrap_or_default(); if spill_over > 0 { - let Segment::V0 { items } = &mut segment; + let items = segment.items_mut(); let segment_item = items .pop() .expect("Segment over segment size always has at least one item; qed"); @@ -597,16 +629,28 @@ impl Archiver { Some(segment) } - /// Take segment as an input, apply necessary transformations and produce archived object mappings. - /// Must be called before `produce_archived_segment()`. - fn produce_object_mappings(&self, segment: &Segment) -> Vec { - let source_piece_indexes = &self.segment_index.segment_piece_indexes_source_first() + /// Produce object mappings for the buffered items for the next segment. Then remove the + /// mappings in those items. + /// + /// Must only be called after all complete segments for a block have been produced. Before + /// that, the buffer can contain a `BlockContinuation` which spans multiple segments. + fn produce_next_segment_mappings(&mut self) -> Vec { + Self::produce_object_mappings(self.segment_index, self.buffer.iter_mut()) + } + + /// Produce object mappings for `items` in `segment_index`. Then remove the mappings from those + /// items. + /// + /// This method can be called on a `Segment`’s items, or on the `Archiver`'s internal buffer. + fn produce_object_mappings<'a>( + segment_index: SegmentIndex, + items: impl Iterator, + ) -> Vec { + let source_piece_indexes = &segment_index.segment_piece_indexes_source_first() [..RecordedHistorySegment::NUM_RAW_RECORDS]; - let Segment::V0 { items } = &segment; let mut corrected_object_mapping = Vec::new(); - // `+1` corresponds to enum variant encoding - let mut base_offset_in_segment = 1; + let mut base_offset_in_segment = Segment::default().encoded_size(); for segment_item in items { match segment_item { SegmentItem::Padding => { @@ -626,7 +670,7 @@ impl Archiver { bytes, object_mapping, } => { - for block_object in object_mapping.objects() { + for block_object in object_mapping.objects_mut().drain(..) { // `+1` corresponds to `SegmentItem::X {}` enum variant encoding let offset_in_segment = base_offset_in_segment + 1 @@ -643,7 +687,7 @@ impl Archiver { } } SegmentItem::ParentSegmentHeader(_) => { - // Ignore, no objects mappings here + // Ignore, no object mappings here } } diff --git a/crates/subspace-archiving/src/reconstructor.rs b/crates/subspace-archiving/src/reconstructor.rs index a16625e385..f6e1c10fe9 100644 --- a/crates/subspace-archiving/src/reconstructor.rs +++ b/crates/subspace-archiving/src/reconstructor.rs @@ -168,7 +168,7 @@ impl Reconstructor { &mut self, segment_pieces: &[Option], ) -> Result { - let Segment::V0 { items } = self.reconstruct_segment(segment_pieces)?; + let items = self.reconstruct_segment(segment_pieces)?.into_items(); let mut reconstructed_contents = ReconstructedContents::default(); let mut next_block_number = 0; diff --git a/crates/subspace-archiving/tests/integration/archiver.rs b/crates/subspace-archiving/tests/integration/archiver.rs index 06c976d895..5790d94b3f 100644 --- a/crates/subspace-archiving/tests/integration/archiver.rs +++ b/crates/subspace-archiving/tests/integration/archiver.rs @@ -100,8 +100,11 @@ fn archiver() { }; let block_0_outcome = archiver.add_block(block_0.clone(), block_0_object_mapping.clone(), true); let archived_segments = block_0_outcome.archived_segments; + let object_mapping = block_0_outcome.object_mapping.clone(); // There is not enough data to produce archived segment yet assert!(archived_segments.is_empty()); + // All block mappings must appear in the global object mapping + assert_eq!(object_mapping.len(), block_0_object_mapping.objects().len()); let (block_1, block_1_object_mapping) = { let mut block = vec![0u8; RecordedHistorySegment::SIZE / 3 * 2]; @@ -140,7 +143,7 @@ fn archiver() { // This should produce 1 archived segment let block_1_outcome = archiver.add_block(block_1.clone(), block_1_object_mapping.clone(), true); let archived_segments = block_1_outcome.archived_segments; - let object_mapping = block_1_outcome.object_mapping; + let object_mapping = block_1_outcome.object_mapping.clone(); assert_eq!(archived_segments.len(), 1); let first_archived_segment = archived_segments.first().cloned().unwrap(); @@ -164,20 +167,28 @@ fn archiver() { assert_eq!(last_archived_block.partial_archived(), Some(65011701)); } - // 4 objects fit into the first segment - assert_eq!(object_mapping.len(), 4); + // All block mappings must appear in the global object mapping + assert_eq!(object_mapping.len(), block_1_object_mapping.objects().len()); { + // 4 objects fit into the first segment, 2 from block 0 and 2 from block 1 let block_objects = iter::repeat(block_0.as_ref()) .zip(block_0_object_mapping.objects()) - .chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects())); - let global_objects = object_mapping.into_iter().map(|object_mapping| { - ( - Piece::from( - &first_archived_segment.pieces[object_mapping.piece_index.position() as usize], - ), - object_mapping, - ) - }); + .chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects())) + .take(4); + let global_objects = block_0_outcome + .object_mapping + .into_iter() + .chain(object_mapping) + .take(4) + .map(|object_mapping| { + ( + Piece::from( + &first_archived_segment.pieces + [object_mapping.piece_index.position() as usize], + ), + object_mapping, + ) + }); compare_block_objects_to_global_objects(block_objects, global_objects); } @@ -215,7 +226,7 @@ fn archiver() { assert_eq!(archived_segments.len(), 2); // Check that initializing archiver with initial state before last block results in the same - // archived segments and mappings once last block is added + // archived segments once last block is added. { let mut archiver_with_initial_state = Archiver::with_initial_state( kzg.clone(), @@ -226,21 +237,35 @@ fn archiver() { ) .unwrap(); + let initial_block_2_outcome = archiver_with_initial_state.add_block( + block_2.clone(), + BlockObjectMapping::default(), + true, + ); + + // The rest of block 1 doesn't create any segments by itself assert_eq!( - archiver_with_initial_state.add_block( - block_2.clone(), - BlockObjectMapping::default(), - true - ), - block_2_outcome, + initial_block_2_outcome.archived_segments, + block_2_outcome.archived_segments + ); + + // The rest of block 1 doesn't create any segments, but it does have the final block 1 + // object mapping. And there are no mappings in block 2. + assert_eq!(initial_block_2_outcome.object_mapping.len(), 1); + assert_eq!( + initial_block_2_outcome.object_mapping[0], + block_1_outcome.object_mapping[2] ); } + // No block mappings should appear in the global object mapping + assert_eq!(object_mapping.len(), 0); // 1 object fits into the second segment // There are no objects left for the third segment - assert_eq!(object_mapping.len(), 1); assert_eq!( - object_mapping[0].piece_index.segment_index(), + block_1_outcome.object_mapping[2] + .piece_index + .segment_index(), archived_segments[0].segment_header.segment_index(), ); { @@ -341,9 +366,19 @@ fn archiver() { ) .unwrap(); + let initial_block_3_outcome = + archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true); + + // The rest of block 2 doesn't create any segments by itself + assert_eq!( + initial_block_3_outcome.archived_segments, + block_3_outcome.archived_segments, + ); + + // The rest of block 2 doesn't have any mappings assert_eq!( - archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true), - block_3_outcome, + initial_block_3_outcome.object_mapping, + block_3_outcome.object_mapping ); } diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 9a1feed12a..ebfb575497 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -389,7 +389,7 @@ impl ObjectFetcher { ); let mut data = { - let Segment::V0 { items } = self.read_segment(segment_index).await?; + let items = self.read_segment(segment_index).await?.into_items(); // Go through the segment until we reach the offset. // Unconditional progress is enum variant + compact encoding of number of elements let mut progress = 1 + Compact::compact_len(&(items.len() as u64)); @@ -495,7 +495,7 @@ impl ObjectFetcher { // headers and optional padding. loop { segment_index += SegmentIndex::ONE; - let Segment::V0 { items } = self.read_segment(segment_index).await?; + let items = self.read_segment(segment_index).await?.into_items(); for segment_item in items { match segment_item { SegmentItem::BlockContinuation { bytes, .. } => {