Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ecton committed Feb 22, 2022
1 parent 2acbded commit 463be41
Show file tree
Hide file tree
Showing 17 changed files with 1,033 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"crates/bonsaidb-server",
"crates/bonsaidb-keystorage-s3",
"crates/bonsaidb-utils",
"crates/bonsaidb-jobs",
"examples/*",
"book/book-examples",
"xtask",
Expand All @@ -27,6 +28,7 @@ members = [
# nebari = { path = "../nebari/nebari", version = "0.3" }
# nebari = { git = "https://github.com/khonsulabs/nebari.git", branch = "main" }
# arc-bytes = { path = "../shared-buffer" }
circulate = { path = "../circulate" }

# [patch."https://github.com/khonsulabs/custodian.git"]
# custodian-password = { path = "../custodian/password" }
Expand Down
3 changes: 2 additions & 1 deletion crates/bonsaidb-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use async_trait::async_trait;
#[cfg(feature = "password-hashing")]
use bonsaidb_core::connection::{Authenticated, Authentication};
use bonsaidb_core::{
arc_bytes::OwnedBytes,
connection::{Database, StorageConnection},
custom_api::{CustomApi, CustomApiResult},
networking::{
Expand Down Expand Up @@ -828,7 +829,7 @@ async fn process_response_payload<A: CustomApi>(
if sender
.send(std::sync::Arc::new(bonsaidb_core::circulate::Message {
topic,
payload: payload.into_vec(),
payload: OwnedBytes::from(payload.0),
}))
.is_err()
{
Expand Down
33 changes: 24 additions & 9 deletions crates/bonsaidb-client/src/client/remote_database/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use bonsaidb_core::{
arc_bytes::serde::Bytes,
arc_bytes::OwnedBytes,
circulate::Message,
custom_api::CustomApi,
networking::{DatabaseRequest, DatabaseResponse, Request, Response},
Expand Down Expand Up @@ -51,13 +51,31 @@ where
payload: &P,
) -> Result<(), bonsaidb_core::Error> {
let payload = pot::to_vec(&payload)?;
self.publish_raw(topic, payload).await
}

async fn publish_to_all<P: Serialize + Sync>(
&self,
topics: Vec<String>,
payload: &P,
) -> Result<(), bonsaidb_core::Error> {
let payload = pot::to_vec(&payload)?;
self.publish_raw_to_all(topics, payload).await
}

async fn publish_raw<S: Into<String> + Send, P: Into<OwnedBytes> + Send>(
&self,
topic: S,
payload: P,
) -> Result<(), bonsaidb_core::Error> {
let payload = payload.into();
match self
.client
.send_request(Request::Database {
database: self.name.to_string(),
request: DatabaseRequest::Publish {
topic: topic.into(),
payload: Bytes::from(payload),
payload,
},
})
.await?
Expand All @@ -70,20 +88,17 @@ where
}
}

async fn publish_to_all<P: Serialize + Sync>(
async fn publish_raw_to_all<P: Into<OwnedBytes> + Send>(
&self,
topics: Vec<String>,
payload: &P,
payload: P,
) -> Result<(), bonsaidb_core::Error> {
let payload = pot::to_vec(&payload)?;
let payload = payload.into();
match self
.client
.send_request(Request::Database {
database: self.name.to_string(),
request: DatabaseRequest::PublishToAll {
topics,
payload: Bytes::from(payload),
},
request: DatabaseRequest::PublishToAll { topics, payload },
})
.await?
{
Expand Down
6 changes: 3 additions & 3 deletions crates/bonsaidb-core/src/networking.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arc_bytes::serde::Bytes;
use arc_bytes::{serde::Bytes, OwnedBytes};
use derive_where::derive_where;
use schema::SchemaName;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -230,15 +230,15 @@ pub enum DatabaseRequest {
/// The topics to publish to.
topic: String,
/// The payload to publish.
payload: Bytes,
payload: OwnedBytes,
},
/// Publishes `payload` to all subscribers of all `topics`.
#[cfg_attr(feature = "actionable-traits", actionable(protection = "custom"))]
PublishToAll {
/// The topics to publish to.
topics: Vec<String>,
/// The payload to publish.
payload: Bytes,
payload: OwnedBytes,
},
/// Subscribes `subscriber_id` to messages for `topic`.
#[cfg_attr(feature = "actionable-traits", actionable(protection = "simple"))]
Expand Down
33 changes: 33 additions & 0 deletions crates/bonsaidb-core/src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use arc_bytes::OwnedBytes;
use async_trait::async_trait;
use circulate::{flume, Message, Relay};
use serde::Serialize;
Expand All @@ -22,12 +23,26 @@ pub trait PubSub {
payload: &P,
) -> Result<(), Error>;

/// Publishes a `payload` without any extra processing to all subscribers of `topic`.
async fn publish_raw<S: Into<String> + Send, P: Into<OwnedBytes> + Send>(
&self,
topic: S,
payload: P,
) -> Result<(), Error>;

/// Publishes a `payload` to all subscribers of all `topics`.
async fn publish_to_all<P: Serialize + Sync>(
&self,
topics: Vec<String>,
payload: &P,
) -> Result<(), Error>;

/// Publishes a `payload` without any extra processing to all subscribers of `topic`.
async fn publish_raw_to_all<P: Into<OwnedBytes> + Send>(
&self,
topics: Vec<String>,
payload: P,
) -> Result<(), Error>;
}

/// A subscriber to one or more topics.
Expand Down Expand Up @@ -69,6 +84,24 @@ impl PubSub for Relay {
self.publish_to_all(topics, payload).await?;
Ok(())
}

async fn publish_raw<S: Into<String> + Send, P: Into<OwnedBytes> + Send>(
&self,
topic: S,
payload: P,
) -> Result<(), Error> {
self.publish_raw(topic, payload).await;
Ok(())
}

async fn publish_raw_to_all<P: Into<OwnedBytes> + Send>(
&self,
topics: Vec<String>,
payload: P,
) -> Result<(), Error> {
self.publish_raw_to_all(topics, payload).await;
Ok(())
}
}

#[async_trait]
Expand Down
26 changes: 26 additions & 0 deletions crates/bonsaidb-jobs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "bonsaidb-jobs"
version = "0.2.0"
authors = ["Jonathan Johnson <[email protected]>"]
edition = "2021"
description = "Persistent job queueing and scheduling for BonsaiDb."
repository = "https://github.com/khonsulabs/bonsaidb"
license = "MIT OR Apache-2.0"
keywords = ["bonsaidb", "job", "queue"]
categories = ["config"]
readme = "./README.md"
homepage = "https://bonsaidb.io/"
rust-version = "1.58"

[dependencies]
bonsaidb-core = { version = "0.2.0", path = "../bonsaidb-core" }
serde = { version = "1", features = ["derive"] }
thiserror = "1"
tokio = { version = "=1.16.1", default-features = false, features = ["sync"] }
flume = "0.10"

[dev-dependencies]
tokio = { version = "=1.16.1", features = ["full"] }
bonsaidb-core = { version = "0.2.0", path = "../bonsaidb-core", features = [
"test-util",
] }
Loading

0 comments on commit 463be41

Please sign in to comment.