Skip to content

Commit

Permalink
feat: secondary indexes use redb multimap table
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-herlemont committed Aug 24, 2024
1 parent 82ebe53 commit 003d9e0
Show file tree
Hide file tree
Showing 24 changed files with 648 additions and 100 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ itertools = "0.13"
include_dir = "0.7"

[features]
default = [ "upgrade_0_5_x" ]
default = [ "upgrade_0_5_x", "upgrade_0_7_x" ]
upgrade_0_5_x = [ "redb1" ]
upgrade_0_7_x = [ ]

[[bench]]
name = "overhead_data_size"
Expand Down
8 changes: 4 additions & 4 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::transaction::RTransaction;
use crate::transaction::RwTransaction;
use crate::watch::query::{InternalWatch, Watch};
use crate::{watch, Metadata};
use redb::{ReadableTableMetadata, TableHandle};
use redb::{MultimapTableHandle, ReadableTableMetadata, TableHandle};
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -125,9 +125,9 @@ impl<'a> Database<'a> {
for secondary_key in model_builder.model.secondary_keys.iter() {
primary_table_definition.secondary_tables.insert(
secondary_key.clone(),
redb::TableDefinition::new(secondary_key.unique_table_name.as_str()).into(),
redb::MultimapTableDefinition::new(secondary_key.unique_table_name.as_str()).into(),
);
rw.open_table(
rw.open_multimap_table(
primary_table_definition.secondary_tables[&secondary_key]
.redb
.clone(),
Expand Down Expand Up @@ -226,7 +226,7 @@ impl<'a> Database<'a> {
let mut stats_secondary_tables = vec![];
for primary_table in self.primary_table_definitions.values() {
for secondary_table in primary_table.secondary_tables.values() {
let result_table_open = rx.open_table(secondary_table.redb.clone());
let result_table_open = rx.open_multimap_table(secondary_table.redb.clone());
let stats_table = match result_table_open {
Err(redb::TableError::TableDoesNotExist(_)) => StatsTable {
name: secondary_table.redb.name().to_string(),
Expand Down
3 changes: 2 additions & 1 deletion src/database_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ impl Builder {
let builder = self.database_configuration.new_rdb_builder();
let database_instance = match DatabaseInstance::open_on_disk(builder, &path) {
Err(Error::RedbDatabaseError(redb::DatabaseError::UpgradeRequired(_))) => {
upgrade::upgrade(&self.database_configuration, &path, &models.models_builder)
upgrade::upgrade_redb(&self.database_configuration, &path, &models.models_builder)
}
Err(error) => return Err(error),
Ok(database_instance) => Ok(database_instance),
}?;
upgrade::upgrade_underlying_database(&database_instance, &models.models_builder)?;
self.init(database_instance, models)
}

Expand Down
14 changes: 8 additions & 6 deletions src/db_type/input.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::db_type::{composite_key, Error, Key, KeyDefinition, KeyEntry, KeyOptions, Result};
use crate::db_type::{Error, Key, KeyDefinition, KeyEntry, KeyOptions, Result};

#[derive(Debug)]
pub struct Input {
Expand All @@ -21,13 +21,15 @@ impl Input {
let out = if !secondary_key_def.options.unique {
match secondary_key {
KeyEntry::Default(value) => {
KeyEntry::Default(composite_key(value, &self.primary_key))
// KeyEntry::Default(composite_key(value, &self.primary_key))
KeyEntry::Default(value.to_owned())
}
KeyEntry::Optional(value) => {
let value = value
.as_ref()
.map(|value| composite_key(value, &self.primary_key));
KeyEntry::Optional(value)
// let value = value
// .as_ref()
// .map(|value| composite_key(value, &self.primary_key));
// KeyEntry::Optional(value)
KeyEntry::Optional(value.as_ref().map(|value| value.to_owned()))
}
}
} else {
Expand Down
18 changes: 16 additions & 2 deletions src/db_type/key/key.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use redb::{Key as RedbKey, TypeName, Value as RedbValue};
use std::fmt::Debug;
use std::ops::{Bound, Range, RangeBounds, RangeFrom, RangeInclusive, RangeTo, RangeToInclusive};
use std::u8;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Key(Vec<u8>);
Expand All @@ -14,6 +15,11 @@ impl Key {
self.0.extend(data.0.iter());
}

pub fn extend_with_delimiter(&mut self, delimiter: u8, data: &Key) {
self.0.push(delimiter);
self.0.extend(data.0.iter());
}

pub(crate) fn as_slice(&self) -> &[u8] {
self.0.as_slice()
}
Expand Down Expand Up @@ -334,7 +340,11 @@ impl KeyRange {
{
match (bounds.start_bound(), bounds.end_bound()) {
(Bound::Included(start), Bound::Included(end)) => {
KeyRange::RangeInclusive(start.to_key()..=end.to_key())
let start = start.to_key();
let mut end = end.to_key();
// Add 255 to the end key to include the last key
end.extend(&Key::new(vec![255]));
KeyRange::RangeInclusive(start..=end)
}
(Bound::Included(start), Bound::Excluded(end)) => {
KeyRange::Range(start.to_key()..end.to_key())
Expand All @@ -343,7 +353,11 @@ impl KeyRange {
start: start.to_key(),
}),
(Bound::Excluded(start), Bound::Included(end)) => {
KeyRange::RangeInclusive(start.to_key()..=end.to_key())
let start = start.to_key();
let mut end = end.to_key();
// Add 255 to the end key to include the last key
end.extend(&Key::new(vec![255]));
KeyRange::RangeInclusive(start..=end)
}
(Bound::Excluded(start), Bound::Excluded(end)) => {
KeyRange::Range(start.to_key()..end.to_key())
Expand Down
6 changes: 5 additions & 1 deletion src/db_type/key/key_definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub struct KeyOptions {

pub fn composite_key(secondary_key: &Key, primary_key: &Key) -> Key {
let mut secondary_key = secondary_key.clone();
secondary_key.extend(primary_key);
// The addition of a delimiter (a byte set to `0`) used between the concatenation
// of secondary keys and primary keys ensures that there is always a byte smaller
// than the value of the `end` of an inclusive range, which always ends with a byte
// set to `255`. See `KeyRange` the inclusive range defined with `start..=end`.
secondary_key.extend_with_delimiter(0, primary_key);
secondary_key
}
2 changes: 0 additions & 2 deletions src/metadata/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub fn load_or_create_metadata(database_instance: &DatabaseInstance) -> Result<M
let read_thx = database.begin_read()?;

if let Ok(table) = read_thx.open_table(TABLE) {
println!("Metadata table found");
let current_version = table
.get(VERSION_NATIVE_DB_NAME)?
.expect("Fatal error: current_version not found");
Expand All @@ -42,7 +41,6 @@ pub fn load_or_create_metadata(database_instance: &DatabaseInstance) -> Result<M
current_native_model_version.value().to_string(),
));
} else {
println!("Metadata table not found");
// Create the metadata table if it does not exist
let metadata = Metadata::default();
save_metadata(database_instance, &metadata)?;
Expand Down
12 changes: 8 additions & 4 deletions src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::db_type::Result;
use crate::{Builder, Database, Models};
use redb::ReadableMultimapTable;
use redb::ReadableTable;
use std::path::Path;

Expand All @@ -20,11 +21,14 @@ impl Database<'_> {

// Copy secondary tables
for (_, secondary_table_definition) in &primary_table_definition.secondary_tables {
let table = r.open_table(secondary_table_definition.redb)?;
let mut new_table = w.open_table(secondary_table_definition.redb)?;
let table = r.open_multimap_table(secondary_table_definition.redb)?;
let mut new_table = w.open_multimap_table(secondary_table_definition.redb)?;
for result in table.iter()? {
let (key, value) = result?;
new_table.insert(key.value(), value.value())?;
let (secondary_key, primary_keys) = result?;
for primary_key in primary_keys {
let primary_key = primary_key?;
new_table.insert(secondary_key.value(), primary_key.value())?;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/table_definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::fmt::Debug;

pub(crate) type RedbPrimaryTableDefinition<'a> = redb::TableDefinition<'a, Key, &'static [u8]>;
pub(crate) type RedbSecondaryTableDefinition<'a> = redb::TableDefinition<'a, Key, Key>;
pub(crate) type RedbSecondaryTableDefinition<'a> = redb::MultimapTableDefinition<'a, Key, Key>;

pub struct PrimaryTableDefinition<'a> {
pub(crate) model: crate::Model,
Expand Down
11 changes: 6 additions & 5 deletions src/transaction/internal/private_readable_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use crate::db_type::{
};
use crate::table_definition::PrimaryTableDefinition;
use crate::Model;
use redb::ReadableTable;
use redb::ReadableTableMetadata;
use redb::{ReadableMultimapTable, ReadableTable};
use std::collections::HashMap;

pub trait PrivateReadableTransaction<'db, 'txn> {
type RedbPrimaryTable: ReadableTable<Key, &'static [u8]>;
type RedbSecondaryTable: ReadableTable<Key, Key>;
type RedbSecondaryTable: ReadableMultimapTable<Key, Key>;

type RedbTransaction<'db_bis>
where
Expand Down Expand Up @@ -44,9 +44,10 @@ pub trait PrivateReadableTransaction<'db, 'txn> {

let table = self.get_secondary_table(&model, &secondary_key)?;

let value = table.get(key.to_key())?;
let primary_key = if let Some(value) = value {
value.value().to_owned()
let mut primary_keys = table.get(key.to_key())?;
let primary_key = if let Some(primary_key) = primary_keys.next() {
let primary_key = primary_key?;
primary_key.value().to_owned()
} else {
return Ok(None);
};
Expand Down
4 changes: 2 additions & 2 deletions src/transaction/internal/r_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ where
Self: 'db,
{
type RedbPrimaryTable = redb::ReadOnlyTable<Key, &'static [u8]>;
type RedbSecondaryTable = redb::ReadOnlyTable<Key, Key>;
type RedbSecondaryTable = redb::ReadOnlyMultimapTable<Key, Key>;

type RedbTransaction<'db_bis> = redb::ReadTransaction where Self: 'db_bis;

Expand Down Expand Up @@ -53,7 +53,7 @@ where
})?;
let table = self
.redb_transaction
.open_table(secondary_table_definition.redb)?;
.open_multimap_table(secondary_table_definition.redb)?;
Ok(table)
}
}
80 changes: 46 additions & 34 deletions src/transaction/internal/rw_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::table_definition::PrimaryTableDefinition;
use crate::transaction::internal::private_readable_transaction::PrivateReadableTransaction;
use crate::watch::WatcherRequest;
use crate::{db_type::ToInput, Model};
use redb::ReadableTable;
use redb::ReadableMultimapTable;
use redb::ReadableTableMetadata;
use redb::TableHandle;
use std::collections::{HashMap, HashSet};
Expand All @@ -20,7 +20,7 @@ where
Self: 'db,
{
type RedbPrimaryTable = redb::Table<'txn, Key, &'static [u8]>;
type RedbSecondaryTable = redb::Table<'txn, Key, Key>;
type RedbSecondaryTable = redb::MultimapTable<'txn, Key, Key>;

type RedbTransaction<'db_bis> = redb::WriteTransaction where Self: 'db_bis;

Expand Down Expand Up @@ -58,7 +58,7 @@ where
})?;
let table = self
.redb_transaction
.open_table(secondary_table_definition.redb)?;
.open_multimap_table(secondary_table_definition.redb)?;
Ok(table)
}
}
Expand All @@ -74,32 +74,38 @@ impl<'db> InternalRwTransaction<'db> {
model: Model,
item: Input,
) -> Result<(WatcherRequest, Output)> {
let already_exists;
{
let mut table = self.get_primary_table(&model)?;
already_exists = table
.insert(&item.primary_key, item.value.as_slice())?
.is_some();
}
table.insert(&item.primary_key, item.value.as_slice())?;
};

for (secondary_key_def, _value) in &item.secondary_keys {
let mut secondary_table = self.get_secondary_table(&model, secondary_key_def)?;
let result = match item.secondary_key_value(secondary_key_def)? {
KeyEntry::Default(value) => secondary_table.insert(value, &item.primary_key)?,
KeyEntry::Optional(value) => {
if let Some(value) = value {
secondary_table.insert(value, &item.primary_key)?
let secondary_key = match item.secondary_key_value(secondary_key_def)? {
KeyEntry::Default(secondary_key) => secondary_key,
KeyEntry::Optional(secondary_key) => {
if let Some(secondary_key) = secondary_key {
secondary_key
} else {
None
continue;
}
}
};
if result.is_some() && !already_exists {
return Err(Error::DuplicateKey {
key_name: secondary_key_def.unique_table_name.to_string(),

if secondary_key_def.options.unique {
let check = {
let primary_keys = secondary_table.get(&secondary_key)?;
primary_keys.len() > 0
};
if check {
return Err(Error::DuplicateKey {
key_name: secondary_key_def.unique_table_name.to_string(),
}
.into());
}
.into());
}

secondary_table.insert(secondary_key, &item.primary_key)?;
}

Ok((
Expand All @@ -126,12 +132,12 @@ impl<'db> InternalRwTransaction<'db> {
for (secondary_key_def, _value) in keys {
let mut secondary_table = self.get_secondary_table(&model, secondary_key_def)?;
match &item.secondary_key_value(secondary_key_def)? {
KeyEntry::Default(value) => {
secondary_table.remove(value)?;
KeyEntry::Default(secondary_key) => {
secondary_table.remove(secondary_key, &item.primary_key)?;
}
KeyEntry::Optional(value) => {
if let Some(value) = value {
secondary_table.remove(value)?;
KeyEntry::Optional(secondary_key) => {
if let Some(value) = secondary_key {
secondary_table.remove(value, &item.primary_key)?;
}
}
}
Expand Down Expand Up @@ -192,21 +198,27 @@ impl<'db> InternalRwTransaction<'db> {
let mut secondary_keys_to_delete = vec![];
let mut number_detected_key_to_delete = key_items.len();
for secondary_items in secondary_table.iter()? {
// Ta avoid to iter on all secondary keys if we have already detected all keys to delete
if number_detected_key_to_delete == 0 {
break;
}
let (secondary_key, primary_key) = secondary_items?;
if key_items.contains(&primary_key.value().to_owned()) {
// TODO remove owned
secondary_keys_to_delete.push(secondary_key.value().to_owned());
number_detected_key_to_delete -= 1;
let (secondary_key, primary_keys) = secondary_items?;
for primary_key in primary_keys {
let primary_key = primary_key?;
// Ta avoid to iter on all secondary keys if we have already detected all keys to delete
if number_detected_key_to_delete == 0 {
break;
}
if key_items.contains(&primary_key.value().to_owned()) {
// TODO remove owned
secondary_keys_to_delete.push((
secondary_key.value().to_owned(),
primary_key.value().to_owned(),
));
number_detected_key_to_delete -= 1;
}
}
}

// Delete secondary keys
for secondary_key in secondary_keys_to_delete {
secondary_table.remove(secondary_key)?;
for (secondary_key, primary_key) in secondary_keys_to_delete {
secondary_table.remove(secondary_key, primary_key)?;
}
}

Expand Down
Loading

0 comments on commit 003d9e0

Please sign in to comment.