Skip to content

Commit

Permalink
Implement piece fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 committed Sep 26, 2024
1 parent 718411f commit 71b5cfb
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 18 deletions.
17 changes: 8 additions & 9 deletions shared/subspace-data-retrieval/src/object_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

//! Fetching objects stored in the archived history of Subspace Network.

use crate::piece_fetcher::download_pieces;
use crate::piece_getter::{ObjectPieceGetter, PieceGetterError};
use crate::segment_fetcher::{download_segment, SegmentGetterError};
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
Expand Down Expand Up @@ -291,7 +292,7 @@ impl ObjectFetcher {
let remaining_piece_indexes = (next_source_piece_index..)
.filter(|i| i.is_source())
.take(remaining_piece_count);
self.read_pieces(remaining_piece_indexes, piece_index, piece_offset)
self.read_pieces(remaining_piece_indexes)
.await?
.into_iter()
.for_each(|piece| {
Expand Down Expand Up @@ -457,17 +458,14 @@ impl ObjectFetcher {
.await?)
}

/// Concurrently read multiple pieces by their indexes
///
/// The mapping piece index and offset are only used for error reporting.
// TODO: replace with a refactored method that fetches pieces
/// Concurrently read multiple pieces, and return them in the supplied order.
async fn read_pieces(
&self,
_piece_indexes: impl IntoIterator<Item = PieceIndex>,
_mapping_piece_index: PieceIndex,
_mapping_piece_offset: u32,
piece_indexes: impl IntoIterator<Item = PieceIndex>,
) -> Result<Vec<Piece>, Error> {
unimplemented!("read_pieces will be implemented as part of a refactoring")
download_pieces(piece_indexes, &self.piece_getter)
.await
.map_err(|source| Error::PieceGetterPermanent { source })
}

/// Read and return a single piece.
Expand Down Expand Up @@ -515,6 +513,7 @@ impl ObjectFetcher {
"Temporary error fetching piece during object assembling"
);

// TODO: retry before failing
Err(Error::PieceGetterTemporary {
piece_index: mapping_piece_index,
})?
Expand Down
91 changes: 89 additions & 2 deletions shared/subspace-data-retrieval/src/piece_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,92 @@
// limitations under the License.

//! Fetching pieces of the archived history of Subspace Network.
//!
//! TODO: move piece fetching here

use crate::piece_getter::{ObjectPieceGetter, PieceGetterError};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use subspace_core_primitives::{Piece, PieceIndex};
use tracing::{debug, trace};

/// Concurrently downloads the exact pieces in `piece_indexes`, returning them in that order.
/// Each piece index must be unique.
///
/// If any piece can't be downloaded, returns an error.
// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks():
// <https://github.com/autonomys/subspace/blob/d71ca47e45e1b53cd2e472413caa23472a91cd74/crates/subspace-service/src/sync_from_dsn/import_blocks.rs#L236-L322>
pub async fn download_pieces<PG>(
piece_indexes: impl IntoIterator<Item = PieceIndex>,
piece_getter: &PG,
) -> Result<Vec<Piece>, PieceGetterError>
where
PG: ObjectPieceGetter,
{
let piece_indexes = piece_indexes.into_iter().collect::<Vec<_>>();

debug!(
count = piece_indexes.len(),
?piece_indexes,
"Retrieving exact pieces"
);

// TODO:
// - consider using a semaphore to limit the number of concurrent requests, like
// download_segment_pieces()
// - if we're close to the number of pieces in a segment, use segment downloading and piece
// reconstruction instead
// Currently most objects are limited to 4 pieces, so this isn't needed yet.
let mut received_pieces = piece_indexes
.iter()
.map(|piece_index| async move {
let piece = match piece_getter.get_piece(*piece_index).await {
Ok(Some(piece)) => {
trace!(?piece_index, "Piece request succeeded",);
Ok(piece)
}
Ok(None) => {
// TODO: retry before failing
trace!(?piece_index, "Piece request temporarily failed",);
return Err(PieceGetterError::NotFound {
piece_index: *piece_index,
});
}
Err(error) => {
trace!(
%error,
?piece_index,
"Piece request permanently failed",
);
return Err(PieceGetterError::NotFoundWithError {
piece_index: *piece_index,
source: error,
});
}
};

piece.map(|received_piece| (piece_index, received_piece))
})
.collect::<FuturesUnordered<_>>();

let mut pieces = vec![Piece::default(); piece_indexes.len()];

while let Some(maybe_result) = received_pieces.next().await {
// We want exact pieces, so any errors are final.
let (piece_index, piece) = maybe_result?;

// Each piece index is unique, so the positions will also be unique.
let position = piece_indexes
.iter()
.position(|index| index == piece_index)
.expect("All piece indexes are from this vec; qed");

pieces[position] = piece;
}

trace!(
count = piece_indexes.len(),
?piece_indexes,
"Successfully retrieved exact pieces"
);

Ok(pieces)
}
7 changes: 0 additions & 7 deletions shared/subspace-data-retrieval/src/piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ pub enum PieceGetterError {
piece_index: PieceIndex,
source: BoxError,
},

/// Piece decoding error
#[error("Piece data decoding error: {source:?}")]
PieceDecoding {
#[from]
source: parity_scale_codec::Error,
},
}

/// Trait representing a way to get pieces from the DSN for object reconstruction
Expand Down

0 comments on commit 71b5cfb

Please sign in to comment.