Skip to content

Commit

Permalink
Produce object mappings incrementally
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Oct 11, 2024
1 parent 5bc4984 commit 9956a53
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 38 deletions.
57 changes: 42 additions & 15 deletions crates/subspace-archiving/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ pub enum Segment {
},
}

impl Default for Segment {
fn default() -> Self {
Segment::V0 { items: Vec::new() }
}
}

impl Encode for Segment {
fn size_hint(&self) -> usize {
RecordedHistorySegment::SIZE
Expand Down Expand Up @@ -278,7 +284,7 @@ impl Archiver {

/// Create a new instance of the archiver with initial state in case of restart.
///
/// `block` corresponds to `last_archived_block` and will be processed accordingly to its state.
/// `block` corresponds to `last_archived_block` and will be processed according to its state.
pub fn with_initial_state(
kzg: Kzg,
erasure_coding: ErasureCoding,
Expand Down Expand Up @@ -315,7 +321,7 @@ impl Archiver {
}
Ordering::Greater => {
// Take part of the encoded block that wasn't archived yet and push to the
// buffer and block continuation
// buffer as a block continuation
object_mapping
.objects_mut()
.retain_mut(|block_object: &mut BlockObject| {
Expand Down Expand Up @@ -346,7 +352,8 @@ impl Archiver {
}
}

/// Adds new block to internal buffer, potentially producing pieces and segment header headers.
/// Adds new block to internal buffer, potentially producing pieces, segment headers, and
/// object mappings.
///
/// Incremental archiving can be enabled if amortized block addition cost is preferred over
/// throughput.
Expand All @@ -365,11 +372,19 @@ impl Archiver {
let mut archived_segments = Vec::new();
let mut object_mapping = Vec::new();

while let Some(segment) = self.produce_segment(incremental) {
object_mapping.extend(self.produce_object_mappings(&segment));
// Add completed segments and their mappings for this block.
while let Some(mut segment) = self.produce_segment(incremental) {
// Produce any segment mappings that haven't already been produced.
object_mapping.extend(Self::produce_object_mappings(
self.segment_index,
segment.items_mut().iter_mut(),
));
archived_segments.push(self.produce_archived_segment(segment));
}

// Produce any next segment buffer mappings that haven't already been produced.
object_mapping.extend(self.produce_next_segment_mappings());

ArchiveBlockOutcome {
archived_segments,
object_mapping,
Expand Down Expand Up @@ -614,16 +629,29 @@ impl Archiver {
Some(segment)
}

/// Take segment as an input, apply necessary transformations and produce archived object mappings.
/// Must be called before `produce_archived_segment()`.
fn produce_object_mappings(&self, segment: &Segment) -> Vec<GlobalObject> {
let source_piece_indexes = &self.segment_index.segment_piece_indexes_source_first()
/// Produce object mappings for the buffered items for the next segment. Then remove the
/// mappings in those items.
///
/// Must only be called after all complete segments for a block have been produced. Before
/// that, the buffer can contain a `BlockContinuation` which spans multiple segments.
fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
Self::produce_object_mappings(self.segment_index, self.buffer.iter_mut())
}

/// Produce object mappings for `items` in `segment_index`. Then remove the mappings from those
/// items.
///
/// This method can be called on a `Segment`’s items, or on the `Archiver`'s internal buffer.
fn produce_object_mappings<'a>(
segment_index: SegmentIndex,
items: impl Iterator<Item = &'a mut SegmentItem>,
) -> Vec<GlobalObject> {
let source_piece_indexes = &segment_index.segment_piece_indexes_source_first()
[..RecordedHistorySegment::NUM_RAW_RECORDS];

let mut corrected_object_mapping = Vec::new();
// `+1` corresponds to enum variant encoding
let mut base_offset_in_segment = 1;
for segment_item in segment.items() {
let mut base_offset_in_segment = Segment::default().encoded_size();
for segment_item in items {
match segment_item {
SegmentItem::Padding => {
unreachable!(
Expand All @@ -642,7 +670,7 @@ impl Archiver {
bytes,
object_mapping,
} => {
for block_object in object_mapping.objects() {
for block_object in object_mapping.objects_mut().drain(..) {
// `+1` corresponds to `SegmentItem::X {}` enum variant encoding
let offset_in_segment = base_offset_in_segment
+ 1
Expand All @@ -659,10 +687,9 @@ impl Archiver {
}
}
SegmentItem::ParentSegmentHeader(_) => {
// Ignore, no objects mappings here
// Ignore, no object mappings here
}
}

base_offset_in_segment += segment_item.encoded_size();
}

Expand Down
81 changes: 58 additions & 23 deletions crates/subspace-archiving/tests/integration/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ fn archiver() {
};
let block_0_outcome = archiver.add_block(block_0.clone(), block_0_object_mapping.clone(), true);
let archived_segments = block_0_outcome.archived_segments;
let object_mapping = block_0_outcome.object_mapping.clone();
// There is not enough data to produce archived segment yet
assert!(archived_segments.is_empty());
// All block mappings must appear in the global object mapping
assert_eq!(object_mapping.len(), block_0_object_mapping.objects().len());

let (block_1, block_1_object_mapping) = {
let mut block = vec![0u8; RecordedHistorySegment::SIZE / 3 * 2];
Expand Down Expand Up @@ -140,7 +143,7 @@ fn archiver() {
// This should produce 1 archived segment
let block_1_outcome = archiver.add_block(block_1.clone(), block_1_object_mapping.clone(), true);
let archived_segments = block_1_outcome.archived_segments;
let object_mapping = block_1_outcome.object_mapping;
let object_mapping = block_1_outcome.object_mapping.clone();
assert_eq!(archived_segments.len(), 1);

let first_archived_segment = archived_segments.first().cloned().unwrap();
Expand All @@ -164,20 +167,28 @@ fn archiver() {
assert_eq!(last_archived_block.partial_archived(), Some(65011701));
}

// 4 objects fit into the first segment
assert_eq!(object_mapping.len(), 4);
// All block mappings must appear in the global object mapping
assert_eq!(object_mapping.len(), block_1_object_mapping.objects().len());
{
// 4 objects fit into the first segment, 2 from block 0 and 2 from block 1
let block_objects = iter::repeat(block_0.as_ref())
.zip(block_0_object_mapping.objects())
.chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects()));
let global_objects = object_mapping.into_iter().map(|object_mapping| {
(
Piece::from(
&first_archived_segment.pieces[object_mapping.piece_index.position() as usize],
),
object_mapping,
)
});
.chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects()))
.take(4);
let global_objects = block_0_outcome
.object_mapping
.into_iter()
.chain(object_mapping)
.take(4)
.map(|object_mapping| {
(
Piece::from(
&first_archived_segment.pieces
[object_mapping.piece_index.position() as usize],
),
object_mapping,
)
});

compare_block_objects_to_global_objects(block_objects, global_objects);
}
Expand Down Expand Up @@ -215,7 +226,7 @@ fn archiver() {
assert_eq!(archived_segments.len(), 2);

// Check that initializing archiver with initial state before last block results in the same
// archived segments and mappings once last block is added
// archived segments once last block is added.
{
let mut archiver_with_initial_state = Archiver::with_initial_state(
kzg.clone(),
Expand All @@ -226,21 +237,35 @@ fn archiver() {
)
.unwrap();

let initial_block_2_outcome = archiver_with_initial_state.add_block(
block_2.clone(),
BlockObjectMapping::default(),
true,
);

// The rest of block 1 doesn't create any segments by itself
assert_eq!(
archiver_with_initial_state.add_block(
block_2.clone(),
BlockObjectMapping::default(),
true
),
block_2_outcome,
initial_block_2_outcome.archived_segments,
block_2_outcome.archived_segments
);

// The rest of block 1 doesn't create any segments, but it does have the final block 1
// object mapping. And there are no mappings in block 2.
assert_eq!(initial_block_2_outcome.object_mapping.len(), 1);
assert_eq!(
initial_block_2_outcome.object_mapping[0],
block_1_outcome.object_mapping[2]
);
}

// No block mappings should appear in the global object mapping
assert_eq!(object_mapping.len(), 0);
// 1 object fits into the second segment
// There are no objects left for the third segment
assert_eq!(object_mapping.len(), 1);
assert_eq!(
object_mapping[0].piece_index.segment_index(),
block_1_outcome.object_mapping[2]
.piece_index
.segment_index(),
archived_segments[0].segment_header.segment_index(),
);
{
Expand Down Expand Up @@ -341,9 +366,19 @@ fn archiver() {
)
.unwrap();

let initial_block_3_outcome =
archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true);

// The rest of block 2 doesn't create any segments by itself
assert_eq!(
initial_block_3_outcome.archived_segments,
block_3_outcome.archived_segments,
);

// The rest of block 2 doesn't have any mappings
assert_eq!(
archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true),
block_3_outcome,
initial_block_3_outcome.object_mapping,
block_3_outcome.object_mapping
);
}

Expand Down

0 comments on commit 9956a53

Please sign in to comment.