Skip to content

Commit

Permalink
feat: Make rs-wnfs work in multithreaded contexts (#372)
Browse files Browse the repository at this point in the history
The main goal of this PR is to enable using rs-wnfs in multithreaded contexts, e.g. in axum webservers.
We have a test repo to check whether that works in an MVP: https://github.com/fabricedesre/wnfs-mtload/

Previously that wasn't possible, since the async futures from rs-wnfs weren't `Send`, so you couldn't have a multi-threaded work-stealing async runtime work on them.
The reasons they weren't sync were:
- Futures would capture an `impl BlockStore`, and it's not necessarily known to be `Send`
- Futures would capture an `impl PrivateForest` with the same problem
- Some functions would produce `LocalBoxFuture` or `LocalBoxStream`, which aren't `Send`
- We'd use `async_trait(?Send)` and `async_recursion(?Send)` which opt-in to not having `Send` bounds, since that's what we need for wasm
- Futures would capture internal WNFS data structures like `PrivateNode`, which would use `Rc` internally instead of `Arc`, see also #250 

Some of this work was already addressed in #366. This PR *should* cover the rest.

---

There's a complication with Wasm, where we're e.g. using an external type `extern "C" { type BlockStore; ... }`, which isn't `Send` or `Sync`, and as such can't ever implement a `trait BlockStore: Send + Sync`.
To fix this, we're conditionally compiling in `Send` and `Sync` bounds (and `Arc` and `Rc` and similar) based on the target (See `send_sync_poly.rs`). This is pretty much just copying what noosphere is doing: https://github.com/subconsciousnetwork/noosphere/blob/main/rust/noosphere-common/src/sync.rs

I'm hoping eventually we just fix this and thus enable multi-threaded Wasm, too. But for now this works.

---

* wip - still need to fix the SnapshotBlockStore implementation

* Fix SnapshotBlockStore impl

* Fix wnfs=hamt

* fix: `Send` bounds & `BytesToIpld` implementations

Also: fix formatting

* feat: Also make `PrivateForest` trait `Send + Sync`

Also: Remove unneeded `Sync` bounds left over from previous commits.

* feat: Use `BoxFuture` instead of `LocalBoxFuture` in `PrivateForest` fn

* feat: Remove `(?Send)` annotations everywhere

* feat: Conditionally compile `Send + Sync` bounds

This relaxes the requirement if you're not on the `wasm32` target.
The problem is that foreign types in Wasm don't implement `Sync`.

* chore: Fix all tests & doctests to use thread-safe RNGs

---------

Co-authored-by: Fabrice Desré <[email protected]>
  • Loading branch information
matheus23 and fabricedesre authored Nov 28, 2023
1 parent 30ed01b commit 98d43cb
Show file tree
Hide file tree
Showing 54 changed files with 764 additions and 491 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ That's the public filesystem, the private filesystem, on the other hand, is a bi
```rust
use anyhow::Result;
use chrono::Utc;
use rand::thread_rng;
use rand_chacha::ChaCha12Rng;
use rand_core::SeedableRng;
use wnfs::{
common::MemoryBlockStore,
private::{
Expand All @@ -230,7 +231,7 @@ async fn main() -> Result<()> {
let store = &MemoryBlockStore::default();
// A random number generator.
let rng = &mut thread_rng();
let rng = &mut ChaCha12Rng::from_entropy();
// Create a private forest.
let forest = &mut HamtForest::new_trusted_rc(rng);
Expand Down
8 changes: 5 additions & 3 deletions wnfs-bench/hamt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use criterion::{
Criterion, Throughput,
};
use proptest::{arbitrary::any, collection::vec, test_runner::TestRunner};
use std::{cmp, sync::Arc};
use std::cmp;
use wnfs_common::{
async_encode, decode, libipld::cbor::DagCborCodec, utils::Sampleable, BlockStore, Link,
MemoryBlockStore,
async_encode, decode,
libipld::cbor::DagCborCodec,
utils::{Arc, Sampleable},
BlockStore, Link, MemoryBlockStore,
};
use wnfs_hamt::{
diff, merge,
Expand Down
2 changes: 2 additions & 0 deletions wnfs-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ base64 = { version = "0.21", optional = true }
base64-serde = { version = "0.7", optional = true }
bytes = { version = "1.4", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
dashmap = "5.5.3"
futures = "0.3"
libipld = { version = "0.16", features = ["dag-cbor", "derive", "serde-codec"] }
multihash = "0.18"
once_cell = "1.16"
parking_lot = "0.12"
proptest = { version = "1.1", optional = true }
rand_core = "0.6"
serde = { version = "1.0", features = ["rc"] }
Expand Down
25 changes: 15 additions & 10 deletions wnfs-common/src/async_serialize.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::BlockStore;
use crate::{
utils::{Arc, CondSend, CondSync},
BlockStore,
};
use async_trait::async_trait;
use libipld::{error::SerdeError, serde as ipld_serde, Ipld};
use serde::{Serialize, Serializer};
use std::sync::Arc;

//--------------------------------------------------------------------------------------------------
// Macros
Expand All @@ -11,9 +13,10 @@ use std::sync::Arc;
macro_rules! impl_async_serialize {
( $( $ty:ty $( : < $( $generics:ident ),+ > )? ),+ ) => {
$(
#[async_trait(?Send)]
impl $( < $( $generics ),+ > )? AsyncSerialize for $ty $( where $( $generics: Serialize ),+ )? {
async fn async_serialize<S: Serializer, BS: BlockStore>(
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl $( < $( $generics ),+ > )? AsyncSerialize for $ty $( where $( $generics: Serialize + CondSync ),+ )? {
async fn async_serialize<S: Serializer + CondSend, BS: BlockStore>(
&self,
serializer: S,
_: &BS,
Expand All @@ -38,12 +41,13 @@ macro_rules! impl_async_serialize {
///
/// An example of this is the PublicDirectory which can contain links to other IPLD nodes.
/// These links need to be resolved to Cids during serialization if they aren't already.
#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait AsyncSerialize {
/// Serializes the type.
async fn async_serialize<S, B>(&self, serializer: S, store: &B) -> Result<S::Ok, S::Error>
where
S: Serializer,
S: Serializer + CondSend,
B: BlockStore + ?Sized;

/// Serialize with an IPLD serializer.
Expand All @@ -59,11 +63,12 @@ pub trait AsyncSerialize {
// Implementations
//--------------------------------------------------------------------------------------------------

#[async_trait(?Send)]
impl<T: AsyncSerialize> AsyncSerialize for Arc<T> {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<T: AsyncSerialize + CondSync> AsyncSerialize for Arc<T> {
async fn async_serialize<S, B>(&self, serializer: S, store: &B) -> Result<S::Ok, S::Error>
where
S: Serializer,
S: Serializer + CondSend,
B: BlockStore + ?Sized,
{
self.as_ref().async_serialize(serializer, store).await
Expand Down
38 changes: 23 additions & 15 deletions wnfs-common/src/blockstore.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{decode, encode, AsyncSerialize, BlockStoreError, MAX_BLOCK_SIZE};
use crate::{
decode, encode,
utils::{Arc, CondSend, CondSync},
AsyncSerialize, BlockStoreError, MAX_BLOCK_SIZE,
};
use anyhow::{bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -8,8 +12,9 @@ use libipld::{
multihash::{Code, MultihashDigest},
serde as ipld_serde, Cid,
};
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{cell::RefCell, collections::HashMap};
use std::collections::HashMap;

//--------------------------------------------------------------------------------------------------
// Constants
Expand Down Expand Up @@ -44,10 +49,11 @@ pub const CODEC_RAW: u64 = 0x55;
//--------------------------------------------------------------------------------------------------

/// For types that implement block store operations like adding, getting content from the store.
#[async_trait(?Send)]
pub trait BlockStore: Sized {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait BlockStore: Sized + CondSync {
async fn get_block(&self, cid: &Cid) -> Result<Bytes>;
async fn put_block(&self, bytes: impl Into<Bytes>, codec: u64) -> Result<Cid>;
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid>;

async fn get_deserializable<V>(&self, cid: &Cid) -> Result<V>
where
Expand All @@ -60,15 +66,15 @@ pub trait BlockStore: Sized {

async fn put_serializable<V>(&self, value: &V) -> Result<Cid>
where
V: Serialize,
V: Serialize + CondSync,
{
let bytes = encode(&ipld_serde::to_ipld(value)?, DagCborCodec)?;
self.put_block(bytes, CODEC_DAG_CBOR).await
}

async fn put_async_serializable<V>(&self, value: &V) -> Result<Cid>
where
V: AsyncSerialize,
V: AsyncSerialize + CondSync,
{
let ipld = value.async_serialize_ipld(self).await?;
let bytes = encode(&ipld, DagCborCodec)?;
Expand Down Expand Up @@ -99,11 +105,12 @@ pub trait BlockStore: Sized {
/// An in-memory block store to simulate IPFS.
///
/// IPFS is basically a glorified HashMap.
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct MemoryBlockStore(
#[serde(serialize_with = "crate::utils::serialize_cid_map")]
#[serde(deserialize_with = "crate::utils::deserialize_cid_map")]
pub(crate) RefCell<HashMap<Cid, Bytes>>,
pub(crate) Arc<Mutex<HashMap<Cid, Bytes>>>,
);

impl MemoryBlockStore {
Expand All @@ -113,13 +120,14 @@ impl MemoryBlockStore {
}
}

#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl BlockStore for MemoryBlockStore {
/// Retrieves an array of bytes from the block store with given CID.
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
let bytes = self
.0
.borrow()
.lock()
.get(cid)
.ok_or(BlockStoreError::CIDNotFound(*cid))?
.clone();
Expand All @@ -128,15 +136,15 @@ impl BlockStore for MemoryBlockStore {
}

/// Stores an array of bytes in the block store.
async fn put_block(&self, bytes: impl Into<Bytes>, codec: u64) -> Result<Cid> {
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
// Convert the bytes into a Bytes object
let bytes: Bytes = bytes.into();

// Try to build the CID from the bytes and codec
let cid = self.create_cid(&bytes, codec)?;

// Insert the bytes into the HashMap using the CID as the key
self.0.borrow_mut().insert(cid, bytes);
self.0.lock().insert(cid, bytes);

Ok(cid)
}
Expand All @@ -149,7 +157,7 @@ impl BlockStore for MemoryBlockStore {
/// Tests the retrieval property of a BlockStore-conforming type.
pub async fn bs_retrieval_test<T>(store: &T) -> Result<()>
where
T: BlockStore + Send + 'static,
T: BlockStore + 'static,
{
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
Expand All @@ -173,7 +181,7 @@ where
/// Tests the duplication of a BlockStore-conforming type.
pub async fn bs_duplication_test<T>(store: &T) -> Result<()>
where
T: BlockStore + Send + 'static,
T: BlockStore + 'static,
{
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
Expand Down Expand Up @@ -203,7 +211,7 @@ where
/// Tests the serialization of a BlockStore-conforming type.
pub async fn bs_serialization_test<T>(store: &T) -> Result<()>
where
T: BlockStore + Send + Serialize + 'static + for<'de> Deserialize<'de>,
T: BlockStore + Serialize + 'static + for<'de> Deserialize<'de>,
{
// Example objects to insert and remove from the blockstore
let bytes = vec![1, 2, 3, 4, 5];
Expand Down
4 changes: 2 additions & 2 deletions wnfs-common/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{AsyncSerialize, BlockStore};
use crate::{utils::CondSync, AsyncSerialize, BlockStore};
use anyhow::Result;
use libipld::{
codec::{Decode, Encode},
Expand All @@ -24,7 +24,7 @@ where
/// Encodes an async serializable value into DagCbor bytes.
pub async fn async_encode<V, C>(value: &V, store: &impl BlockStore, codec: C) -> Result<Vec<u8>>
where
V: AsyncSerialize,
V: AsyncSerialize + CondSync,
C: Codec,
Ipld: Encode<C>,
{
Expand Down
37 changes: 21 additions & 16 deletions wnfs-common/src/link.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::{traits::IpldEq, AsyncSerialize, BlockStore};
use crate::{
traits::IpldEq,
utils::{Arc, CondSync},
AsyncSerialize, BlockStore,
};
use anyhow::Result;
use async_once_cell::OnceCell;
use async_trait::async_trait;
use libipld::Cid;
use serde::de::DeserializeOwned;
use std::{
fmt::{self, Debug, Formatter},
sync::Arc,
};
use std::fmt::{self, Debug, Formatter};

//--------------------------------------------------------------------------------------------------
// Type Definitions
Expand All @@ -34,7 +35,7 @@ pub enum Link<T> {
// Implementations
//--------------------------------------------------------------------------------------------------

impl<T: RemembersCid> Link<T> {
impl<T: RemembersCid + CondSync> Link<T> {
/// Creates a new `Link` that starts out as a Cid.
pub fn from_cid(cid: Cid) -> Self {
Self::Encoded {
Expand Down Expand Up @@ -172,8 +173,9 @@ impl<T: RemembersCid> Link<T> {
}
}

#[async_trait(?Send)]
impl<T: PartialEq + AsyncSerialize + RemembersCid> IpldEq for Link<T> {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<T: PartialEq + AsyncSerialize + RemembersCid + CondSync> IpldEq for Link<T> {
async fn eq(&self, other: &Link<T>, store: &impl BlockStore) -> Result<bool> {
if self == other {
return Ok(true);
Expand Down Expand Up @@ -210,7 +212,7 @@ where
}
}

impl<T: RemembersCid> PartialEq for Link<T>
impl<T: RemembersCid + CondSync> PartialEq for Link<T>
where
T: PartialEq,
{
Expand Down Expand Up @@ -268,7 +270,9 @@ impl<T: RemembersCid> RemembersCid for Arc<T> {

#[cfg(test)]
mod tests {
use crate::{AsyncSerialize, BlockStore, Link, MemoryBlockStore, RemembersCid};
use crate::{
utils::CondSend, AsyncSerialize, BlockStore, Link, MemoryBlockStore, RemembersCid,
};
use ::serde::{Deserialize, Serialize};
use async_once_cell::OnceCell;
use async_trait::async_trait;
Expand All @@ -282,13 +286,14 @@ mod tests {
persisted_as: OnceCell<Cid>,
}

#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl AsyncSerialize for Example {
async fn async_serialize<S: Serializer, BS: BlockStore + ?Sized>(
&self,
serializer: S,
_: &BS,
) -> Result<S::Ok, S::Error> {
async fn async_serialize<S, B>(&self, serializer: S, _store: &B) -> Result<S::Ok, S::Error>
where
S: Serializer + CondSend,
B: BlockStore + ?Sized,
{
self.serialize(serializer)
}
}
Expand Down
2 changes: 1 addition & 1 deletion wnfs-common/src/pathnodes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use crate::utils::Arc;

//--------------------------------------------------------------------------------------------------
// Type Definitions
Expand Down
3 changes: 2 additions & 1 deletion wnfs-common/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use async_trait::async_trait;
//--------------------------------------------------------------------------------------------------

/// Implements deep equality check for two types.
#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait IpldEq {
/// Checks if the two items are deeply equal.
async fn eq(&self, other: &Self, store: &impl BlockStore) -> Result<bool>;
Expand Down
12 changes: 7 additions & 5 deletions wnfs-common/src/utils/common.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::Arc;
use crate::HashOutput;
use anyhow::Result;
use bytes::Bytes;
use futures::{AsyncRead, AsyncReadExt};
use libipld::{Cid, IpldCodec};
use parking_lot::Mutex;
use rand_core::CryptoRngCore;
use serde::{Deserialize, Serialize, Serializer};
use std::{cell::RefCell, collections::HashMap};
use std::collections::HashMap;

//--------------------------------------------------------------------------------------------------
// Functions
Expand Down Expand Up @@ -84,14 +86,14 @@ pub fn u64_to_ipld(value: u64) -> Result<IpldCodec> {
}

pub(crate) fn serialize_cid_map<S>(
map: &RefCell<HashMap<Cid, Bytes>>,
map: &Arc<Mutex<HashMap<Cid, Bytes>>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let map = map
.borrow()
.lock()
.iter()
.map(|(cid, bytes)| (cid.to_string(), bytes.to_vec()))
.collect::<HashMap<_, _>>();
Expand All @@ -101,7 +103,7 @@ where

pub(crate) fn deserialize_cid_map<'de, D>(
deserializer: D,
) -> Result<RefCell<HashMap<Cid, Bytes>>, D::Error>
) -> Result<Arc<Mutex<HashMap<Cid, Bytes>>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Expand All @@ -114,5 +116,5 @@ where
})
.collect::<Result<_, _>>()?;

Ok(RefCell::new(map))
Ok(Arc::new(Mutex::new(map)))
}
Loading

0 comments on commit 98d43cb

Please sign in to comment.