Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: adjust types to allow dynamic usage #306

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 67 additions & 23 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,45 +115,89 @@ impl TryFrom<models::Interest> for ValidatedInterest {
}

#[async_trait]
pub trait AccessInterestStore: Clone + Send + Sync {
type Key: Key;
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;

pub trait AccessInterestStore: Send + Sync {
/// Returns true if the key was newly inserted, false if it already existed.
async fn insert(&self, key: Self::Key) -> Result<bool>;
async fn insert(&self, key: Interest) -> Result<bool>;
async fn range(
&self,
start: Self::Key,
end: Self::Key,
start: &Interest,
end: &Interest,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>>;
) -> Result<Vec<Interest>>;
}

#[async_trait]
pub trait AccessModelStore: Clone + Send + Sync {
type Key: Key;
type Hash: AssociativeHash + std::fmt::Debug + Serialize + for<'de> Deserialize<'de>;
impl<S: AccessInterestStore> AccessInterestStore for Arc<S> {
async fn insert(&self, key: Interest) -> Result<bool> {
self.as_ref().insert(key).await
}

async fn range(
&self,
start: &Interest,
end: &Interest,
offset: usize,
limit: usize,
) -> Result<Vec<Interest>> {
self.as_ref().range(start, end, offset, limit).await
}
}

#[async_trait]
pub trait AccessModelStore: Send + Sync {
/// Returns (new_key, new_value) where true if was newly inserted, false if it already existed.
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<(bool, bool)>;
async fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<(bool, bool)>;
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
start: &EventId,
end: &EventId,
offset: usize,
limit: usize,
) -> Result<Vec<(Self::Key, Vec<u8>)>>;
) -> Result<Vec<(EventId, Vec<u8>)>>;

async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>>;
async fn value_for_key(&self, key: &EventId) -> Result<Option<Vec<u8>>>;

// it's likely `highwater` will be a string or struct when we have alternative storage for now we
// keep it simple to allow easier error propagation. This isn't currently public outside of this repo.
async fn keys_since_highwater_mark(
&self,
highwater: i64,
limit: i64,
) -> anyhow::Result<(i64, Vec<Self::Key>)>;
) -> Result<(i64, Vec<EventId>)>;
}

#[async_trait::async_trait]
impl<S: AccessModelStore> AccessModelStore for Arc<S> {
async fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<(bool, bool)> {
self.as_ref().insert(key, value).await
}

async fn range_with_values(
&self,
start: &EventId,
end: &EventId,
offset: usize,
limit: usize,
) -> Result<Vec<(EventId, Vec<u8>)>> {
self.as_ref()
.range_with_values(start, end, offset, limit)
.await
}

async fn value_for_key(&self, key: &EventId) -> Result<Option<Vec<u8>>> {
self.as_ref().value_for_key(key).await
}

async fn keys_since_highwater_mark(
&self,
highwater: i64,
limit: i64,
) -> Result<(i64, Vec<EventId>)> {
self.as_ref()
.keys_since_highwater_mark(highwater, limit)
.await
}
}

#[derive(Clone)]
Expand All @@ -167,8 +211,8 @@ pub struct Server<C, I, M> {

impl<C, I, M> Server<C, I, M>
where
I: AccessInterestStore<Key = Interest>,
M: AccessModelStore<Key = EventId>,
I: AccessInterestStore,
M: AccessModelStore,
{
pub fn new(peer_id: PeerId, network: Network, interest: I, model: M) -> Self {
Server {
Expand Down Expand Up @@ -232,7 +276,7 @@ where

let events = self
.model
.range_with_values(start, stop, offset, limit)
.range_with_values(&start, &stop, offset, limit)
.await
.map_err(|err| ErrorResponse::new(format!("failed to get keys: {err}")))?
.into_iter()
Expand Down Expand Up @@ -281,7 +325,7 @@ where
event_id: String,
) -> Result<EventsEventIdGetResponse, ErrorResponse> {
let decoded_event_id = decode_event_id(&event_id)?;
match self.model.value_for_key(decoded_event_id.clone()).await {
match self.model.value_for_key(&decoded_event_id).await {
Ok(Some(data)) => {
let event = BuildResponse::event(decoded_event_id, data);
Ok(EventsEventIdGetResponse::Success(event))
Expand Down Expand Up @@ -385,8 +429,8 @@ pub(crate) fn decode_event_data(value: &str) -> Result<Vec<u8>, ErrorResponse> {
impl<C, I, M> Api<C> for Server<C, I, M>
where
C: Send + Sync,
I: AccessInterestStore<Key = Interest> + Sync,
M: AccessModelStore<Key = EventId> + Sync,
I: AccessInterestStore + Sync,
M: AccessModelStore + Sync,
{
#[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))]
async fn liveness_get(
Expand Down
38 changes: 17 additions & 21 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use ceramic_core::{Cid, Interest};
use ceramic_core::{EventId, Network, PeerId, StreamId};
use mockall::{mock, predicate};
use multibase::Base;
use recon::{Key, Sha256a};
use recon::Key;
use std::str::FromStr;
use tracing_test::traced_test;

Expand All @@ -26,8 +26,8 @@ mock! {
fn insert(&self, key: Interest, value: Option<Vec<u8>>) -> Result<bool>;
fn range_with_values(
&self,
start: Interest,
end: Interest,
start: &Interest,
end: &Interest,
offset: usize,
limit: usize,
) -> Result<Vec<(Interest, Vec<u8>)>>;
Expand All @@ -38,18 +38,16 @@ mock! {
}
#[async_trait]
impl AccessInterestStore for MockReconInterestTest {
type Key = Interest;
type Hash = Sha256a;
async fn insert(&self, key: Self::Key) -> Result<bool> {
async fn insert(&self, key: Interest) -> Result<bool> {
self.insert(key, None)
}
async fn range(
&self,
start: Self::Key,
end: Self::Key,
start: &Interest,
end: &Interest,
offset: usize,
limit: usize,
) -> Result<Vec<Self::Key>> {
) -> Result<Vec<Interest>> {
let res = self.range_with_values(start, end, offset, limit)?;
Ok(res.into_iter().map(|(k, _)| k).collect())
}
Expand All @@ -59,41 +57,39 @@ mock! {
fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<(bool, bool)>;
fn range_with_values(
&self,
start: EventId,
end: EventId,
start: &EventId,
end: &EventId,
offset: usize,
limit: usize,
) -> Result<Vec<(EventId,Vec<u8>)>>;
fn value_for_key(&self, key: EventId) -> Result<Option<Vec<u8>>>;
fn value_for_key(&self, key: &EventId) -> Result<Option<Vec<u8>>>;
}
impl Clone for ReconModelTest {
fn clone(&self) -> Self;
}
}
#[async_trait]
impl AccessModelStore for MockReconModelTest {
type Key = EventId;
type Hash = Sha256a;
async fn insert(&self, key: Self::Key, value: Option<Vec<u8>>) -> Result<(bool, bool)> {
async fn insert(&self, key: EventId, value: Option<Vec<u8>>) -> Result<(bool, bool)> {
self.insert(key, value)
}
async fn range_with_values(
&self,
start: Self::Key,
end: Self::Key,
start: &EventId,
end: &EventId,
offset: usize,
limit: usize,
) -> Result<Vec<(Self::Key, Vec<u8>)>> {
) -> Result<Vec<(EventId, Vec<u8>)>> {
self.range_with_values(start, end, offset, limit)
}
async fn value_for_key(&self, key: Self::Key) -> Result<Option<Vec<u8>>> {
async fn value_for_key(&self, key: &EventId) -> Result<Option<Vec<u8>>> {
self.value_for_key(key)
}
async fn keys_since_highwater_mark(
&self,
_highwater: i64,
_limit: i64,
) -> anyhow::Result<(i64, Vec<Self::Key>)> {
) -> anyhow::Result<(i64, Vec<EventId>)> {
Ok((0, vec![]))
}
}
Expand Down Expand Up @@ -358,7 +354,7 @@ async fn get_events_for_interest_range() {
predicate::eq(1),
)
.times(1)
.returning(|s, _, _, _| Ok(vec![(s, vec![])]));
.returning(|s, _, _, _| Ok(vec![(s.clone(), vec![])]));
let server = Server::new(peer_id, network, mock_interest, mock_model);
let resp = server
.events_sort_key_sort_value_get(
Expand Down
20 changes: 17 additions & 3 deletions beetle/iroh-bitswap/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ pub struct Stat {

#[derive(Derivative)]
#[derivative(Debug)]
#[derive(Clone)]
pub struct Client<S: Store> {
network: Network,
store: S,
store: Arc<S>,
session_manager: SessionManager,
provider_search_delay: Duration,
rebroadcast_delay: Duration,
Expand All @@ -70,13 +69,28 @@ pub struct Client<S: Store> {
notify: async_broadcast::Sender<Block>,
}

impl<S: Store> Clone for Client<S> {
fn clone(&self) -> Self {
Client {
network: self.network.clone(),
store: self.store.clone(),
session_manager: self.session_manager.clone(),
provider_search_delay: self.provider_search_delay,
rebroadcast_delay: self.rebroadcast_delay,
simulate_dont_haves_on_timeout: self.simulate_dont_haves_on_timeout,
blocks_received_cb: self.blocks_received_cb.clone(),
notify: self.notify.clone(),
}
}
}

pub type BlocksReceivedCb =
dyn Fn(PeerId, Vec<Block>) -> BoxFuture<'static, ()> + 'static + Send + Sync;

impl<S: Store> Client<S> {
pub async fn new(
network: Network,
store: S,
store: Arc<S>,
blocks_received_cb: Option<Box<BlocksReceivedCb>>,
config: Config,
) -> Self {
Expand Down
25 changes: 20 additions & 5 deletions beetle/iroh-bitswap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,32 @@ impl Default for Config {
}

#[async_trait]
pub trait Store: Debug + Clone + Send + Sync + 'static {
pub trait Store: Send + Sync + 'static {
async fn get_size(&self, cid: &Cid) -> Result<usize>;
async fn get(&self, cid: &Cid) -> Result<Block>;
async fn has(&self, cid: &Cid) -> Result<bool>;
}

#[async_trait::async_trait]
impl<S: Store> Store for Arc<S> {
async fn get_size(&self, cid: &Cid) -> Result<usize> {
self.as_ref().get_size(cid).await
}

async fn get(&self, cid: &Cid) -> Result<Block> {
self.as_ref().get(cid).await
}

async fn has(&self, cid: &Cid) -> Result<bool> {
self.as_ref().has(cid).await
}
}

impl<S: Store> Bitswap<S> {
pub async fn new(self_id: PeerId, store: S, config: Config) -> Self {
pub async fn new(self_id: PeerId, store: Arc<S>, config: Config) -> Self {
let network = Network::new(self_id);
let (server, cb) = if let Some(config) = config.server {
let server = Server::new(network.clone(), store.clone(), config).await;
let server = Server::new(network.clone(), Arc::clone(&store), config).await;
let cb = server.received_blocks_cb();
(Some(server), Some(cb))
} else {
Expand Down Expand Up @@ -739,7 +754,7 @@ mod tests {
async fn get_block<const N: usize>() {
info!("get_block");
let (peer1_id, trans) = mk_transport();
let store1 = TestStore::default();
let store1 = Arc::new(TestStore::default());
let bs1 = Bitswap::new(peer1_id, store1.clone(), Config::default()).await;

let config = swarm::Config::with_tokio_executor()
Expand Down Expand Up @@ -774,7 +789,7 @@ mod tests {

info!("peer2: startup");
let (peer2_id, trans) = mk_transport();
let store2 = TestStore::default();
let store2 = Arc::new(TestStore::default());
let bs2 = Bitswap::new(peer2_id, store2.clone(), Config::default()).await;

let config = swarm::Config::with_tokio_executor()
Expand Down
13 changes: 11 additions & 2 deletions beetle/iroh-bitswap/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct Stat {
pub data_sent: u64,
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Server<S: Store> {
// sent_histogram -> iroh-metrics
// send_time_histogram -> iroh-metric
Expand All @@ -65,6 +65,15 @@ pub struct Server<S: Store> {
inner: Arc<Inner>,
}

impl<S: Store> Clone for Server<S> {
fn clone(&self) -> Self {
Server {
engine: self.engine.clone(),
inner: self.inner.clone(),
}
}
}

#[derive(Debug)]
struct Inner {
/// Counters for various statistics.
Expand All @@ -81,7 +90,7 @@ struct Inner {
}

impl<S: Store> Server<S> {
pub async fn new(network: Network, store: S, config: Config) -> Self {
pub async fn new(network: Network, store: Arc<S>, config: Config) -> Self {
let engine = DecisionEngine::new(store, *network.self_id(), config.decision_config).await;
let provide_keys = mpsc::channel(PROVIDE_KEYS_BUFFER_SIZE);
let new_blocks = mpsc::channel(config.has_block_buffer_size);
Expand Down
Loading
Loading