diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 17cb63be96..9ccd1213d4 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -15,6 +15,7 @@ //! Fetching objects stored in the archived history of Subspace Network. +use crate::piece_fetcher::download_pieces; use crate::piece_getter::{DsnSyncPieceGetter, PieceGetterError}; use crate::segment_fetcher::{download_segment, SegmentGetterError}; use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; @@ -291,7 +292,7 @@ impl ObjectFetcher { let remaining_piece_indexes = (next_source_piece_index..) .filter(|i| i.is_source()) .take(remaining_piece_count); - self.read_pieces(remaining_piece_indexes, piece_index, piece_offset) + self.read_pieces(remaining_piece_indexes) .await? .into_iter() .for_each(|piece| { @@ -457,17 +458,14 @@ impl ObjectFetcher { .await?) } - /// Concurrently read multiple pieces by their indexes - /// - /// The mapping piece index and offset are only used for error reporting. - // TODO: replace with a refactored method that fetches pieces + /// Concurrently read multiple pieces, and return them in the supplied order. async fn read_pieces( &self, - _piece_indexes: impl IntoIterator, - _mapping_piece_index: PieceIndex, - _mapping_piece_offset: u32, + piece_indexes: impl IntoIterator, ) -> Result, Error> { - unimplemented!("read_pieces will be implemented as part of a refactoring") + download_pieces(piece_indexes, &self.piece_getter) + .await + .map_err(|source| Error::PieceGetterPermanent { source }) } /// Read and return a single piece. @@ -515,6 +513,7 @@ impl ObjectFetcher { "Temporary error fetching piece during object assembling" ); + // TODO: retry before failing Err(Error::PieceGetterTemporary { piece_index: mapping_piece_index, })? diff --git a/shared/subspace-data-retrieval/src/piece_fetcher.rs b/shared/subspace-data-retrieval/src/piece_fetcher.rs index bf1f4517bf..183ff83b73 100644 --- a/shared/subspace-data-retrieval/src/piece_fetcher.rs +++ b/shared/subspace-data-retrieval/src/piece_fetcher.rs @@ -14,5 +14,84 @@ // limitations under the License. //! Fetching pieces of the archived history of Subspace Network. -//! -//! TODO: move piece fetching here + +use crate::piece_getter::{DsnSyncPieceGetter, PieceGetterError}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use subspace_core_primitives::{Piece, PieceIndex}; +use tracing::{debug, trace}; + +/// Concurrently downloads the exact pieces in `piece_indexes`, returning them in that order. +/// Each piece index must be unique. +/// +/// If any piece can't be downloaded, returns an error. +pub async fn download_pieces( + piece_indexes: impl IntoIterator, + piece_getter: &PG, +) -> Result, PieceGetterError> +where + PG: DsnSyncPieceGetter, +{ + let piece_indexes = piece_indexes.into_iter().collect::>(); + + debug!( + count = piece_indexes.len(), + ?piece_indexes, + "Retrieving exact pieces" + ); + + let mut received_pieces = piece_indexes + .iter() + .map(|piece_index| async move { + let piece = match piece_getter.get_piece(*piece_index).await { + Ok(Some(piece)) => { + trace!(?piece_index, "Piece request succeeded",); + Ok(piece) + } + Ok(None) => { + // TODO: retry before failing + trace!(?piece_index, "Piece request temporarily failed",); + return Err(PieceGetterError::NotFound { + piece_index: *piece_index, + }); + } + Err(error) => { + trace!( + %error, + ?piece_index, + "Piece request permanently failed", + ); + return Err(PieceGetterError::NotFoundWithError { + piece_index: *piece_index, + source: error, + }); + } + }; + + piece.map(|received_piece| (piece_index, received_piece)) + }) + .collect::>(); + + let mut pieces = vec![Piece::default(); piece_indexes.len()]; + + while let Some(maybe_result) = received_pieces.next().await { + // We want exact pieces, so any errors are final. + let (piece_index, piece) = maybe_result?; + + // Each piece index is unique, so the positions will also be unique. + let position = piece_indexes + .iter() + .position(|index| index == piece_index) + .expect("All piece indexes are from this vec; qed"); + + pieces[position] = piece; + } + + trace!( + count = piece_indexes.len(), + ?piece_indexes, + "Successfully retrieved exact pieces" + ); + + Ok(pieces) +} diff --git a/shared/subspace-data-retrieval/src/piece_getter.rs b/shared/subspace-data-retrieval/src/piece_getter.rs index 647d12ab7a..cb7aeeb953 100644 --- a/shared/subspace-data-retrieval/src/piece_getter.rs +++ b/shared/subspace-data-retrieval/src/piece_getter.rs @@ -37,13 +37,6 @@ pub enum PieceGetterError { piece_index: PieceIndex, source: BoxError, }, - - /// Piece decoding error - #[error("Piece data decoding error: {source:?}")] - PieceDecoding { - #[from] - source: parity_scale_codec::Error, - }, } /// Trait representing a way to get pieces for DSN sync purposes