Skip to content

Commit

Permalink
chore: Write docs for pull module & clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Feb 21, 2024
1 parent aba89c8 commit 3736d46
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 54 deletions.
2 changes: 1 addition & 1 deletion car-mirror/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ impl std::fmt::Debug for ReceiverState {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use crate::{cache::NoCache, test_utils::assert_cond_send_sync};
use testresult::TestResult;
Expand Down
206 changes: 198 additions & 8 deletions car-mirror/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub mod incremental_verification;
/// Data types that are sent over-the-wire and relevant serialization code.
pub mod messages;
/// The CAR mirror pull protocol. Meant to be used qualified, i.e. `pull::request` and `pull::response`.
pub mod pull;
/// The CAR mirror push protocol. Meant to be used qualified, i.e. `push::request` and `push::response`.
///
/// This library exposes both streaming and non-streaming variants. It's recommended to use
/// the streaming variants if possible.
Expand All @@ -53,8 +51,202 @@ pub mod pull;
/// use car_mirror::cache::InMemoryCache;
/// use wnfs_common::MemoryBlockStore;
/// use wnfs_unixfs_file::builder::FileBuilder;
///
/// # #[async_std::main]
/// # async fn main() -> anyhow::Result<()> {
/// // We simulate peers having separate data stores
/// let client_store = MemoryBlockStore::new();
/// let server_store = MemoryBlockStore::new();
///
/// // Give both peers ~1MB of cache space for speeding up computations.
/// // These are available under the `quick_cache` feature.
/// // (You can also implement your own, or disable caches using `NoCache`)
/// let client_cache = InMemoryCache::new(100_000);
/// let server_cache = InMemoryCache::new(100_000);
///
/// let file_bytes = async_std::fs::read("../Cargo.lock").await?;
///
/// // Load some data onto the client
/// let root = FileBuilder::new()
/// .content_bytes(file_bytes.clone())
/// .fixed_chunker(1024) // Generate lots of small blocks
/// .degree(4)
/// .build()?
/// .store(&client_store)
/// .await?;
///
/// // The server may already have a subset of the data
/// FileBuilder::new()
/// .content_bytes(file_bytes[0..10_000].to_vec())
/// .fixed_chunker(1024) // Generate lots of small blocks
/// .degree(4)
/// .build()?
/// .store(&server_store)
/// .await?;
/// # Ok(())
/// # }
/// ```
///
/// ### With Streaming
///
/// This simulates a pull protocol run between two peers locally:
///
/// ```
/// use car_mirror::{pull, common::Config};
/// use futures::TryStreamExt;
/// use tokio_util::io::StreamReader;
/// # use car_mirror::cache::InMemoryCache;
/// # use wnfs_common::MemoryBlockStore;
/// # use wnfs_unixfs_file::builder::FileBuilder;
/// #
/// # #[async_std::main]
/// # async fn main() -> anyhow::Result<()> {
/// # let client_store = MemoryBlockStore::new();
/// # let server_store = MemoryBlockStore::new();
/// #
/// # let client_cache = InMemoryCache::new(100_000);
/// # let server_cache = InMemoryCache::new(100_000);
/// #
/// # let file_bytes = async_std::fs::read("../Cargo.lock").await?;
/// #
/// # let root = FileBuilder::new()
/// # .content_bytes(file_bytes.clone())
/// # .fixed_chunker(1024) // Generate lots of small blocks
/// # .degree(4)
/// # .build()?
/// # .store(&client_store)
/// # .await?;
/// #
/// # FileBuilder::new()
/// # .content_bytes(file_bytes[0..10_000].to_vec())
/// # .fixed_chunker(1024) // Generate lots of small blocks
/// # .degree(4)
/// # .build()?
/// # .store(&server_store)
/// # .await?;
///
/// // We set up some protocol configurations (allowed maximum block sizes etc.)
/// let config = &Config::default();
///
/// // The client generates a request of what data still needs to be fetched
/// let mut request =
/// pull::request(root, None, config, &client_store, &client_cache).await?;
///
/// // The request contains information about which blocks still need to be
/// // fetched, so we can use it to find out whether we need to to fetch any
/// // blocks at all.
/// while !request.indicates_finished() {
/// // The server answers with a stream of data
/// let chunk_stream = pull::response_streaming(
/// root,
/// request,
/// &server_store,
/// &server_cache
/// ).await?;
///
/// let byte_stream = StreamReader::new(
/// chunk_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
/// );
///
/// // The client verifies & stores the streamed data and possibly
/// // interrupts the stream to produce a new request with more precise
/// // information on what to pull.
/// request = pull::handle_response_streaming(
/// root,
/// byte_stream,
/// config,
/// &client_store,
/// &client_cache,
/// ).await?;
/// }
/// # Ok(())
/// # }
/// ```
///
///
/// ### Without Streaming
///
/// This simulates a pull protocol run between two peers locally, without streaming:
///
/// ```
/// use car_mirror::{pull, common::Config};
/// # use car_mirror::cache::InMemoryCache;
/// # use wnfs_common::MemoryBlockStore;
/// # use wnfs_unixfs_file::builder::FileBuilder;
/// #
/// # #[async_std::main]
/// # async fn main() -> anyhow::Result<()> {
/// # let client_store = MemoryBlockStore::new();
/// # let server_store = MemoryBlockStore::new();
/// #
/// # let client_cache = InMemoryCache::new(100_000);
/// # let server_cache = InMemoryCache::new(100_000);
/// #
/// # let file_bytes = async_std::fs::read("../Cargo.lock").await?;
/// #
/// # let root = FileBuilder::new()
/// # .content_bytes(file_bytes.clone())
/// # .fixed_chunker(1024) // Generate lots of small blocks
/// # .degree(4)
/// # .build()?
/// # .store(&client_store)
/// # .await?;
/// #
/// # FileBuilder::new()
/// # .content_bytes(file_bytes[0..10_000].to_vec())
/// # .fixed_chunker(1024) // Generate lots of small blocks
/// # .degree(4)
/// # .build()?
/// # .store(&server_store)
/// # .await?;
///
/// // We set up some protocol configurations (allowed maximum block sizes etc.)
/// let config = &Config::default();
///
/// let mut last_car = None;
/// loop {
/// // The client handles a possible previous response and produces a request
/// let request = pull::request(
/// root,
/// last_car,
/// config,
/// &client_store,
/// &client_cache
/// ).await?;
///
/// if request.indicates_finished() {
/// break; // No need to fetch more, we already have all data
/// }
///
/// // The server consumes the car file and provides information about
/// // further blocks needed
/// last_car = Some(pull::response(
/// root,
/// request,
/// config,
/// &server_store,
/// &server_cache
/// ).await?);
/// }
/// # Ok(())
/// # }
/// ```
pub mod pull;
/// The CAR mirror push protocol. Meant to be used qualified, i.e. `push::request` and `push::response`.
///
/// This library exposes both streaming and non-streaming variants. It's recommended to use
/// the streaming variants if possible.
///
/// ## Examples
///
/// ### Test Data
///
/// We'll set up some test data to simulate the protocol like this:
///
/// ```no_run
/// use car_mirror::cache::InMemoryCache;
/// use wnfs_common::MemoryBlockStore;
/// use wnfs_unixfs_file::builder::FileBuilder;
///
/// # #[async_std::main]
/// # async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -97,11 +289,11 @@ pub mod pull;
///
/// ```
/// use car_mirror::{push, common::Config};
/// use futures::TryStreamExt;
/// use tokio_util::io::StreamReader;
/// # use car_mirror::cache::InMemoryCache;
/// # use wnfs_common::MemoryBlockStore;
/// # use wnfs_unixfs_file::builder::FileBuilder;
/// # use futures::TryStreamExt;
/// # use tokio_util::io::StreamReader;
/// #
/// # #[async_std::main]
/// # async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -135,15 +327,15 @@ pub mod pull;
/// let mut last_response = None;
/// loop {
/// // The client generates a request that streams the data to the server
/// let stream = push::request_streaming(
/// let chunk_stream = push::request_streaming(
/// root,
/// last_response,
/// &client_store,
/// &client_cache
/// ).await?;
///
/// let byte_stream = StreamReader::new(
/// stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
/// chunk_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
/// );
///
/// // The server consumes the streaming request & interrupts with new
Expand Down Expand Up @@ -177,8 +369,6 @@ pub mod pull;
/// # use car_mirror::cache::InMemoryCache;
/// # use wnfs_common::MemoryBlockStore;
/// # use wnfs_unixfs_file::builder::FileBuilder;
/// # use futures::TryStreamExt;
/// # use tokio_util::io::StreamReader;
/// #
/// # #[async_std::main]
/// # async fn main() -> anyhow::Result<()> {
Expand Down
63 changes: 48 additions & 15 deletions car-mirror/src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,18 @@ pub async fn response_streaming<'a>(
#[cfg(test)]
mod tests {
use crate::{
cache::NoCache,
cache::{InMemoryCache, NoCache},
common::Config,
dag_walk::DagWalk,
test_utils::{setup_random_dag, Metrics},
pull,
test_utils::{setup_random_dag, store_test_unixfs, Metrics},
};
use anyhow::Result;
use futures::TryStreamExt;
use libipld::Cid;
use std::collections::HashSet;
use testresult::TestResult;
use tokio_util::io::StreamReader;
use wnfs_common::{BlockStore, MemoryBlockStore};

pub(crate) async fn simulate_protocol(
Expand All @@ -99,20 +101,18 @@ mod tests {
server_store: &impl BlockStore,
) -> Result<Vec<Metrics>> {
let mut metrics = Vec::new();
let mut request = crate::pull::request(root, None, config, client_store, &NoCache).await?;
let mut request = pull::request(root, None, config, client_store, &NoCache).await?;
while !request.indicates_finished() {
let request_bytes = serde_ipld_dagcbor::to_vec(&request)?.len();
let response =
crate::pull::response(root, request, config, server_store, NoCache).await?;
let response = pull::response(root, request, config, server_store, NoCache).await?;
let response_bytes = response.bytes.len();

metrics.push(Metrics {
request_bytes,
response_bytes,
});

request =
crate::pull::request(root, Some(response), config, client_store, &NoCache).await?;
request = pull::request(root, Some(response), config, client_store, &NoCache).await?;
}

Ok(metrics)
Expand Down Expand Up @@ -141,6 +141,43 @@ mod tests {

Ok(())
}

#[test_log::test(async_std::test)]
async fn test_streaming_transfer() -> TestResult {
let client_store = MemoryBlockStore::new();
let server_store = MemoryBlockStore::new();

let client_cache = InMemoryCache::new(100_000);
let server_cache = InMemoryCache::new(100_000);

let file_bytes = async_std::fs::read("../Cargo.lock").await?;
let root = store_test_unixfs(file_bytes.clone(), &client_store).await?;
store_test_unixfs(file_bytes[0..10_000].to_vec(), &server_store).await?;

let config = &Config::default();

let mut request = pull::request(root, None, config, &client_store, &client_cache).await?;

while !request.indicates_finished() {
let car_stream =
pull::response_streaming(root, request, &server_store, &server_cache).await?;

let byte_stream = StreamReader::new(
car_stream.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
);

request = pull::handle_response_streaming(
root,
byte_stream,
config,
&client_store,
&client_cache,
)
.await?;
}

Ok(())
}
}

#[cfg(test)]
Expand All @@ -149,6 +186,7 @@ mod proptests {
cache::NoCache,
common::Config,
dag_walk::DagWalk,
pull,
test_utils::{setup_blockstore, variable_blocksize_dag},
};
use futures::TryStreamExt;
Expand All @@ -164,14 +202,9 @@ mod proptests {
let server_store = &setup_blockstore(blocks).await.unwrap();
let client_store = &MemoryBlockStore::new();

crate::pull::tests::simulate_protocol(
root,
&Config::default(),
client_store,
server_store,
)
.await
.unwrap();
pull::tests::simulate_protocol(root, &Config::default(), client_store, server_store)
.await
.unwrap();

// client should have all data
let client_cids = DagWalk::breadth_first([root])
Expand Down
Loading

0 comments on commit 3736d46

Please sign in to comment.