Skip to content

Commit

Permalink
Merge pull request #3100 from autonomys/incremental-map
Browse files Browse the repository at this point in the history
Generate object mappings incrementally
  • Loading branch information
teor2345 authored Oct 11, 2024
2 parents a3fd8a5 + 38d9547 commit f8b6abf
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 43 deletions.
78 changes: 61 additions & 17 deletions crates/subspace-archiving/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ pub enum Segment {
},
}

impl Default for Segment {
fn default() -> Self {
Segment::V0 { items: Vec::new() }
}
}

impl Encode for Segment {
fn size_hint(&self) -> usize {
RecordedHistorySegment::SIZE
Expand Down Expand Up @@ -124,6 +130,24 @@ impl Segment {
let Self::V0 { items } = self;
items.push(segment_item);
}

pub fn items(&self) -> &[SegmentItem] {
match self {
Segment::V0 { items } => items,
}
}

pub(crate) fn items_mut(&mut self) -> &mut Vec<SegmentItem> {
match self {
Segment::V0 { items } => items,
}
}

pub fn into_items(self) -> Vec<SegmentItem> {
match self {
Segment::V0 { items } => items,
}
}
}

/// Kinds of items that are contained within a segment
Expand Down Expand Up @@ -260,7 +284,7 @@ impl Archiver {

/// Create a new instance of the archiver with initial state in case of restart.
///
/// `block` corresponds to `last_archived_block` and will be processed accordingly to its state.
/// `block` corresponds to `last_archived_block` and will be processed according to its state.
pub fn with_initial_state(
kzg: Kzg,
erasure_coding: ErasureCoding,
Expand Down Expand Up @@ -297,7 +321,7 @@ impl Archiver {
}
Ordering::Greater => {
// Take part of the encoded block that wasn't archived yet and push to the
// buffer and block continuation
// buffer as a block continuation
object_mapping
.objects_mut()
.retain_mut(|block_object: &mut BlockObject| {
Expand Down Expand Up @@ -328,7 +352,8 @@ impl Archiver {
}
}

/// Adds new block to internal buffer, potentially producing pieces and segment header headers.
/// Adds new block to internal buffer, potentially producing pieces, segment headers, and
/// object mappings.
///
/// Incremental archiving can be enabled if amortized block addition cost is preferred over
/// throughput.
Expand All @@ -347,11 +372,19 @@ impl Archiver {
let mut archived_segments = Vec::new();
let mut object_mapping = Vec::new();

while let Some(segment) = self.produce_segment(incremental) {
object_mapping.extend(self.produce_object_mappings(&segment));
// Add completed segments and their mappings for this block.
while let Some(mut segment) = self.produce_segment(incremental) {
// Produce any segment mappings that haven't already been produced.
object_mapping.extend(Self::produce_object_mappings(
self.segment_index,
segment.items_mut().iter_mut(),
));
archived_segments.push(self.produce_archived_segment(segment));
}

// Produce any next segment buffer mappings that haven't already been produced.
object_mapping.extend(self.produce_next_segment_mappings());

ArchiveBlockOutcome {
archived_segments,
object_mapping,
Expand Down Expand Up @@ -387,9 +420,8 @@ impl Archiver {
);
}

let Segment::V0 { items } = segment;
// Push all of the items back into the buffer, we don't have enough data yet
for segment_item in items.into_iter().rev() {
for segment_item in segment.into_items().into_iter().rev() {
self.buffer.push_front(segment_item);
}

Expand Down Expand Up @@ -478,7 +510,7 @@ impl Archiver {
.unwrap_or_default();

if spill_over > 0 {
let Segment::V0 { items } = &mut segment;
let items = segment.items_mut();
let segment_item = items
.pop()
.expect("Segment over segment size always has at least one item; qed");
Expand Down Expand Up @@ -597,16 +629,28 @@ impl Archiver {
Some(segment)
}

/// Take segment as an input, apply necessary transformations and produce archived object mappings.
/// Must be called before `produce_archived_segment()`.
fn produce_object_mappings(&self, segment: &Segment) -> Vec<GlobalObject> {
let source_piece_indexes = &self.segment_index.segment_piece_indexes_source_first()
/// Produce object mappings for the buffered items for the next segment. Then remove the
/// mappings in those items.
///
/// Must only be called after all complete segments for a block have been produced. Before
/// that, the buffer can contain a `BlockContinuation` which spans multiple segments.
fn produce_next_segment_mappings(&mut self) -> Vec<GlobalObject> {
Self::produce_object_mappings(self.segment_index, self.buffer.iter_mut())
}

/// Produce object mappings for `items` in `segment_index`. Then remove the mappings from those
/// items.
///
/// This method can be called on a `Segment`’s items, or on the `Archiver`'s internal buffer.
fn produce_object_mappings<'a>(
segment_index: SegmentIndex,
items: impl Iterator<Item = &'a mut SegmentItem>,
) -> Vec<GlobalObject> {
let source_piece_indexes = &segment_index.segment_piece_indexes_source_first()
[..RecordedHistorySegment::NUM_RAW_RECORDS];
let Segment::V0 { items } = &segment;

let mut corrected_object_mapping = Vec::new();
// `+1` corresponds to enum variant encoding
let mut base_offset_in_segment = 1;
let mut base_offset_in_segment = Segment::default().encoded_size();
for segment_item in items {
match segment_item {
SegmentItem::Padding => {
Expand All @@ -626,7 +670,7 @@ impl Archiver {
bytes,
object_mapping,
} => {
for block_object in object_mapping.objects() {
for block_object in object_mapping.objects_mut().drain(..) {
// `+1` corresponds to `SegmentItem::X {}` enum variant encoding
let offset_in_segment = base_offset_in_segment
+ 1
Expand All @@ -643,7 +687,7 @@ impl Archiver {
}
}
SegmentItem::ParentSegmentHeader(_) => {
// Ignore, no objects mappings here
// Ignore, no object mappings here
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-archiving/src/reconstructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl Reconstructor {
&mut self,
segment_pieces: &[Option<Piece>],
) -> Result<ReconstructedContents, ReconstructorError> {
let Segment::V0 { items } = self.reconstruct_segment(segment_pieces)?;
let items = self.reconstruct_segment(segment_pieces)?.into_items();

let mut reconstructed_contents = ReconstructedContents::default();
let mut next_block_number = 0;
Expand Down
81 changes: 58 additions & 23 deletions crates/subspace-archiving/tests/integration/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ fn archiver() {
};
let block_0_outcome = archiver.add_block(block_0.clone(), block_0_object_mapping.clone(), true);
let archived_segments = block_0_outcome.archived_segments;
let object_mapping = block_0_outcome.object_mapping.clone();
// There is not enough data to produce archived segment yet
assert!(archived_segments.is_empty());
// All block mappings must appear in the global object mapping
assert_eq!(object_mapping.len(), block_0_object_mapping.objects().len());

let (block_1, block_1_object_mapping) = {
let mut block = vec![0u8; RecordedHistorySegment::SIZE / 3 * 2];
Expand Down Expand Up @@ -140,7 +143,7 @@ fn archiver() {
// This should produce 1 archived segment
let block_1_outcome = archiver.add_block(block_1.clone(), block_1_object_mapping.clone(), true);
let archived_segments = block_1_outcome.archived_segments;
let object_mapping = block_1_outcome.object_mapping;
let object_mapping = block_1_outcome.object_mapping.clone();
assert_eq!(archived_segments.len(), 1);

let first_archived_segment = archived_segments.first().cloned().unwrap();
Expand All @@ -164,20 +167,28 @@ fn archiver() {
assert_eq!(last_archived_block.partial_archived(), Some(65011701));
}

// 4 objects fit into the first segment
assert_eq!(object_mapping.len(), 4);
// All block mappings must appear in the global object mapping
assert_eq!(object_mapping.len(), block_1_object_mapping.objects().len());
{
// 4 objects fit into the first segment, 2 from block 0 and 2 from block 1
let block_objects = iter::repeat(block_0.as_ref())
.zip(block_0_object_mapping.objects())
.chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects()));
let global_objects = object_mapping.into_iter().map(|object_mapping| {
(
Piece::from(
&first_archived_segment.pieces[object_mapping.piece_index.position() as usize],
),
object_mapping,
)
});
.chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects()))
.take(4);
let global_objects = block_0_outcome
.object_mapping
.into_iter()
.chain(object_mapping)
.take(4)
.map(|object_mapping| {
(
Piece::from(
&first_archived_segment.pieces
[object_mapping.piece_index.position() as usize],
),
object_mapping,
)
});

compare_block_objects_to_global_objects(block_objects, global_objects);
}
Expand Down Expand Up @@ -215,7 +226,7 @@ fn archiver() {
assert_eq!(archived_segments.len(), 2);

// Check that initializing archiver with initial state before last block results in the same
// archived segments and mappings once last block is added
// archived segments once last block is added.
{
let mut archiver_with_initial_state = Archiver::with_initial_state(
kzg.clone(),
Expand All @@ -226,21 +237,35 @@ fn archiver() {
)
.unwrap();

let initial_block_2_outcome = archiver_with_initial_state.add_block(
block_2.clone(),
BlockObjectMapping::default(),
true,
);

// The rest of block 1 doesn't create any segments by itself
assert_eq!(
archiver_with_initial_state.add_block(
block_2.clone(),
BlockObjectMapping::default(),
true
),
block_2_outcome,
initial_block_2_outcome.archived_segments,
block_2_outcome.archived_segments
);

// The rest of block 1 doesn't create any segments, but it does have the final block 1
// object mapping. And there are no mappings in block 2.
assert_eq!(initial_block_2_outcome.object_mapping.len(), 1);
assert_eq!(
initial_block_2_outcome.object_mapping[0],
block_1_outcome.object_mapping[2]
);
}

// No block mappings should appear in the global object mapping
assert_eq!(object_mapping.len(), 0);
// 1 object fits into the second segment
// There are no objects left for the third segment
assert_eq!(object_mapping.len(), 1);
assert_eq!(
object_mapping[0].piece_index.segment_index(),
block_1_outcome.object_mapping[2]
.piece_index
.segment_index(),
archived_segments[0].segment_header.segment_index(),
);
{
Expand Down Expand Up @@ -341,9 +366,19 @@ fn archiver() {
)
.unwrap();

let initial_block_3_outcome =
archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true);

// The rest of block 2 doesn't create any segments by itself
assert_eq!(
initial_block_3_outcome.archived_segments,
block_3_outcome.archived_segments,
);

// The rest of block 2 doesn't have any mappings
assert_eq!(
archiver_with_initial_state.add_block(block_3, BlockObjectMapping::default(), true),
block_3_outcome,
initial_block_3_outcome.object_mapping,
block_3_outcome.object_mapping
);
}

Expand Down
4 changes: 2 additions & 2 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl ObjectFetcher {
);

let mut data = {
let Segment::V0 { items } = self.read_segment(segment_index).await?;
let items = self.read_segment(segment_index).await?.into_items();
// Go through the segment until we reach the offset.
// Unconditional progress is enum variant + compact encoding of number of elements
let mut progress = 1 + Compact::compact_len(&(items.len() as u64));
Expand Down Expand Up @@ -495,7 +495,7 @@ impl ObjectFetcher {
// headers and optional padding.
loop {
segment_index += SegmentIndex::ONE;
let Segment::V0 { items } = self.read_segment(segment_index).await?;
let items = self.read_segment(segment_index).await?.into_items();
for segment_item in items {
match segment_item {
SegmentItem::BlockContinuation { bytes, .. } => {
Expand Down

0 comments on commit f8b6abf

Please sign in to comment.