Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Documents and Views to better utilize Nebari #250

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ members = [
# actionable = { path = "../actionable/actionable", version = "0.2" }
# circulate = { path = "../circulate", version = "0.2" }
# circulate = { git = "https://github.com/khonsulabs/circulate.git", branch = "main" }
# nebari = { path = "../nebari/nebari", version = "0.3" }
# nebari = { git = "https://github.com/khonsulabs/nebari.git", branch = "main" }
# nebari = { path = "../nebari/nebari", version = "0.5.3" }
nebari = { git = "https://github.com/khonsulabs/nebari.git", branch = "main" }
# arc-bytes = { path = "../shared-buffer" }

# [patch."https://github.com/khonsulabs/custodian.git"]
Expand Down
28 changes: 17 additions & 11 deletions benchmarks/benches/commerce/bonsai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use bonsaidb::{
schema::{
view::map::Mappings, Collection, CollectionName, CollectionViewSchema,
DefaultSerialization, InsertError, NamedCollection, Qualified, ReduceResult, Schema,
Schematic, SerializedCollection, View, ViewMapResult, ViewMappedValue,
Schematic, SerializedCollection, View, ViewMapResult,
},
transaction::{self, Transaction},
Error,
Expand Down Expand Up @@ -325,18 +325,24 @@ impl Operator<Checkout, u32> for BonsaiOperator {
};

let measurement = measurements.begin(self.label, Metric::Checkout);

let cart = Cart::get_async(&cart, &self.database)
.await
.unwrap()
.unwrap();
cart.delete_async(&self.database).await.unwrap();
Order {
customer_id: operation.customer_id,
product_ids: cart.contents.product_ids,
}
.push_into_async(&self.database)
.await
.unwrap();
let mut tx = Transaction::default();
tx.push(transaction::Operation::delete(
Cart::collection_name(),
cart.header.try_into().unwrap(),
));
tx.push(
transaction::Operation::push_serialized::<Order>(&Order {
customer_id: operation.customer_id,
product_ids: cart.contents.product_ids,
})
.unwrap(),
);
tx.apply_async(&self.database).await.unwrap();
measurement.finish();

OperationResult::Ok
Expand Down Expand Up @@ -485,12 +491,12 @@ impl CollectionViewSchema for ProductReviewsByProduct {

fn reduce(
&self,
mappings: &[ViewMappedValue<Self::View>],
mappings: &[<Self::View as View>::Value],
_rereduce: bool,
) -> ReduceResult<Self::View> {
Ok(mappings
.iter()
.map(|mapping| mapping.value.clone())
.cloned()
.reduce(|a, b| ProductRatings {
total_score: a.total_score + b.total_score,
ratings: a.ratings + b.ratings,
Expand Down
3 changes: 2 additions & 1 deletion benchmarks/benches/commerce/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ async fn agent<B: Backend>(
while let Ok(plan) = plan_receiver.recv_async().await {
let mut results = Vec::with_capacity(plan.operations.len());
for step in &plan.operations {
results.push(operator.operate(step, &results, &measurements).await)
let result = operator.operate(step, &results, &measurements).await;
results.push(result)
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions book/book-examples/tests/view-example-enum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use bonsaidb::{
connection::Connection,
document::{BorrowedDocument, Emit},
key::Key,
schema::{
view::map::ViewMappedValue, Collection, ReduceResult, SerializedCollection, View,
ViewMapResult, ViewSchema,
},
schema::{Collection, ReduceResult, SerializedCollection, View, ViewMapResult, ViewSchema},
Error,
},
local::{
Expand Down Expand Up @@ -50,10 +47,10 @@ impl ViewSchema for BlogPostsByCategory {

fn reduce(
&self,
mappings: &[ViewMappedValue<Self::View>],
mappings: &[<Self::View as View>::Value],
_rereduce: bool,
) -> ReduceResult<Self::View> {
Ok(mappings.iter().map(|mapping| mapping.value).sum())
Ok(mappings.iter().sum())
}
}
// ANCHOR_END: view
Expand Down
9 changes: 3 additions & 6 deletions book/book-examples/tests/view-example-string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use bonsaidb::{
core::{
connection::Connection,
document::{BorrowedDocument, Emit},
schema::{
view::map::ViewMappedValue, Collection, ReduceResult, SerializedCollection, View,
ViewMapResult, ViewSchema,
},
schema::{Collection, ReduceResult, SerializedCollection, View, ViewMapResult, ViewSchema},
Error,
},
local::{
Expand Down Expand Up @@ -40,10 +37,10 @@ impl ViewSchema for BlogPostsByCategory {

fn reduce(
&self,
mappings: &[ViewMappedValue<Self::View>],
mappings: &[<Self::View as View>::Value],
_rereduce: bool,
) -> ReduceResult<Self::View> {
Ok(mappings.iter().map(|mapping| mapping.value).sum())
Ok(mappings.iter().sum())
}
}
// ANCHOR_END: view
Expand Down
11 changes: 8 additions & 3 deletions crates/bonsaidb-core/src/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,13 @@ impl AsRef<[u8]> for OwnedDocument {

impl<'a> BorrowedDocument<'a> {
/// Returns a new instance with the id and content bytes.
pub fn new<Contents: Into<CowBytes<'a>>>(id: DocumentId, contents: Contents) -> Self {
pub fn new<Contents: Into<CowBytes<'a>>>(
id: DocumentId,
sequence_id: u64,
contents: Contents,
) -> Self {
let contents = contents.into();
let revision = Revision::new(&contents);
let revision = Revision::with_id(sequence_id, &contents);
Self {
header: Header { id, revision },
contents,
Expand All @@ -213,14 +217,15 @@ impl<'a> BorrowedDocument<'a> {
/// Returns a new instance with `contents`, after serializing.
pub fn with_contents<C, PrimaryKey>(
id: &PrimaryKey,
sequence_id: u64,
contents: &C::Contents,
) -> Result<Self, crate::Error>
where
C: SerializedCollection,
PrimaryKey: for<'k> KeyEncoding<'k, C::PrimaryKey> + ?Sized,
{
let contents = <C as SerializedCollection>::serialize(contents)?;
Ok(Self::new(DocumentId::new(id)?, contents))
Ok(Self::new(DocumentId::new(id)?, sequence_id, contents))
}

/// Converts this document to an owned document.
Expand Down
6 changes: 3 additions & 3 deletions crates/bonsaidb-core/src/document/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ where
fn emissions_tests() -> Result<(), crate::Error> {
use crate::{schema::Map, test_util::Basic};

let doc = BorrowedDocument::with_contents::<Basic, _>(&1, &Basic::default())?;
let doc = BorrowedDocument::with_contents::<Basic, _>(&1, 0, &Basic::default())?;

assert_eq!(
doc.header.emit()?,
Expand Down Expand Up @@ -211,7 +211,7 @@ fn emissions_tests() -> Result<(), crate::Error> {
fn chained_mappings_test() -> Result<(), crate::Error> {
use crate::{schema::Map, test_util::Basic};

let doc = BorrowedDocument::with_contents::<Basic, _>(&1, &Basic::default())?;
let doc = BorrowedDocument::with_contents::<Basic, _>(&1, 0, &Basic::default())?;

assert_eq!(
doc.header.emit()?.and(doc.header.emit()?),
Expand All @@ -227,7 +227,7 @@ fn chained_mappings_test() -> Result<(), crate::Error> {
#[test]
fn header_display_test() {
let original_contents = b"one";
let revision = Revision::new(original_contents);
let revision = Revision::with_id(0, original_contents);
let header = Header {
id: DocumentId::new(&42_u64).unwrap(),
revision,
Expand Down
31 changes: 11 additions & 20 deletions crates/bonsaidb-core/src/document/revision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,16 @@ use sha2::{Digest, Sha256};
#[derive(Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub struct Revision {
/// The current revision id of the document. This value is sequentially incremented on each document update.
pub id: u32,
pub id: u64,

/// The SHA256 digest of the bytes contained within the `Document`.
pub sha256: [u8; 32],
}

impl Revision {
/// Creates the first revision for a document with the SHA256 digest of the passed bytes.
#[must_use]
pub fn new(contents: &[u8]) -> Self {
Self::with_id(0, contents)
}

/// Creates a revision with `id` for a document with the SHA256 digest of the passed bytes.
#[must_use]
pub fn with_id(id: u32, contents: &[u8]) -> Self {
pub fn with_id(id: u64, contents: &[u8]) -> Self {
Self {
id,
sha256: digest(contents),
Expand All @@ -35,16 +29,13 @@ impl Revision {
///
/// Panics if `id` overflows.
#[must_use]
pub fn next_revision(&self, new_contents: &[u8]) -> Option<Self> {
pub fn next_revision(&self, sequence_id: u64, new_contents: &[u8]) -> Option<Self> {
let sha256 = digest(new_contents);
if sha256 == self.sha256 {
None
} else {
Some(Self {
id: self
.id
.checked_add(1)
.expect("need to implement revision id wrapping or increase revision id size"),
id: sequence_id,
sha256,
})
}
Expand All @@ -59,7 +50,7 @@ impl Debug for Revision {

impl Display for Revision {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<u32 as Display>::fmt(&self.id, f)?;
Display::fmt(&self.id, f)?;
f.write_char('-')?;
for byte in self.sha256 {
f.write_fmt(format_args!("{:02x}", byte))?;
Expand All @@ -77,7 +68,7 @@ fn digest(payload: &[u8]) -> [u8; 32] {
#[test]
fn revision_tests() {
let original_contents = b"one";
let first_revision = Revision::new(original_contents);
let first_revision = Revision::with_id(0, original_contents);
let original_digest =
hex_literal::hex!("7692c3ad3540bb803c020b3aee66cd8887123234ea0c6e7143c0add73ff431ed");
assert_eq!(
Expand All @@ -87,11 +78,11 @@ fn revision_tests() {
sha256: original_digest
}
);
assert!(first_revision.next_revision(original_contents).is_none());
assert!(first_revision.next_revision(1, original_contents).is_none());

let updated_contents = b"two";
let next_revision = first_revision
.next_revision(updated_contents)
.next_revision(1, updated_contents)
.expect("new contents should create a new revision");
assert_eq!(
next_revision,
Expand All @@ -102,10 +93,10 @@ fn revision_tests() {
)
}
);
assert!(next_revision.next_revision(updated_contents).is_none());
assert!(next_revision.next_revision(2, updated_contents).is_none());

assert_eq!(
next_revision.next_revision(original_contents),
next_revision.next_revision(2, original_contents),
Some(Revision {
id: 2,
sha256: original_digest
Expand All @@ -116,7 +107,7 @@ fn revision_tests() {
#[test]
fn revision_display_test() {
let original_contents = b"one";
let first_revision = Revision::new(original_contents);
let first_revision = Revision::with_id(0, original_contents);
assert_eq!(
first_revision.to_string(),
"0-7692c3ad3540bb803c020b3aee66cd8887123234ea0c6e7143c0add73ff431ed"
Expand Down
2 changes: 1 addition & 1 deletion crates/bonsaidb-core/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use self::{
},
schematic::Schematic,
view::{
map::{Map, MappedValue, ViewMappedValue},
map::{Map, MappedValue},
CollectionViewSchema, DefaultViewSerialization, ReduceResult, SerializedView, View,
ViewMapResult, ViewSchema,
},
Expand Down
Loading