Skip to content

Commit

Permalink
Use KeyCode for MVCC keys.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Sep 3, 2023
1 parent 99c9a64 commit 722f766
Showing 1 changed file with 69 additions and 82 deletions.
151 changes: 69 additions & 82 deletions src/storage/mvcc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{encoding, Engine};
use super::{keycode, Engine};
use crate::error::{Error, Result};

use serde::{Deserialize, Serialize};
Expand All @@ -8,6 +8,49 @@ use std::iter::Peekable;
use std::ops::{Bound, RangeBounds};
use std::sync::{Arc, Mutex, MutexGuard};

/// MVCC keys, using the KeyCode encoding which preserves the ordering and
/// grouping of keys. Cow byte slices allow encoding borrowed values and
/// decoding into owned values.
#[derive(Debug, Deserialize, Serialize)]
enum Key<'a> {
/// The next available transaction ID.
TxnNext,
/// Markers for active (uncommitted) transactions by ID, storing the mode.
TxnActive(u64),
/// Transaction snapshot by ID, storing concurrent active transaction IDs.
TxnSnapshot(u64),
/// Update marker for a txn ID and key, used for rollback.
TxnUpdate(
u64,
#[serde(with = "serde_bytes")]
#[serde(borrow)]
Cow<'a, [u8]>,
),
/// A record for a key/version pair.
Record(
#[serde(with = "serde_bytes")]
#[serde(borrow)]
Cow<'a, [u8]>,
u64,
),
/// Arbitrary unversioned metadata.
Metadata(
#[serde(with = "serde_bytes")]
#[serde(borrow)]
Cow<'a, [u8]>,
),
}

impl<'a> Key<'a> {
fn decode(bytes: &'a [u8]) -> Result<Self> {
keycode::deserialize(bytes)
}

fn encode(&self) -> Result<Vec<u8>> {
keycode::serialize(&self)
}
}

/// MVCC status
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Status {
Expand Down Expand Up @@ -53,13 +96,13 @@ impl<E: Engine> MVCC<E> {
/// Fetches an unversioned metadata value
pub fn get_metadata(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let mut session = self.engine.lock()?;
session.get(&Key::Metadata(key.into()).encode())
session.get(&Key::Metadata(key.into()).encode()?)
}

/// Sets an unversioned metadata value
pub fn set_metadata(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
let mut session = self.engine.lock()?;
session.set(&Key::Metadata(key.into()).encode(), value)
session.set(&Key::Metadata(key.into()).encode()?, value)
}

/// Returns engine status
Expand All @@ -71,12 +114,12 @@ impl<E: Engine> MVCC<E> {
let mut engine = self.engine.lock()?;
return Ok(Status {
storage: engine.to_string(),
txns: match engine.get(&Key::TxnNext.encode())? {
txns: match engine.get(&Key::TxnNext.encode()?)? {
Some(ref v) => deserialize(v)?,
None => 1,
} - 1,
txns_active: engine
.scan(Key::TxnActive(0).encode()..Key::TxnActive(std::u64::MAX).encode())
.scan(Key::TxnActive(0).encode()?..Key::TxnActive(std::u64::MAX).encode()?)
.try_fold(0, |count, r| r.map(|_| count + 1))?,
});
}
Expand Down Expand Up @@ -109,12 +152,12 @@ impl<E: Engine> Transaction<E> {
fn begin(engine: Arc<Mutex<E>>, mode: Mode) -> Result<Self> {
let mut session = engine.lock()?;

let id = match session.get(&Key::TxnNext.encode())? {
let id = match session.get(&Key::TxnNext.encode()?)? {
Some(ref v) => deserialize(v)?,
None => 1,
};
session.set(&Key::TxnNext.encode(), serialize(&(id + 1))?)?;
session.set(&Key::TxnActive(id).encode(), serialize(&mode)?)?;
session.set(&Key::TxnNext.encode()?, serialize(&(id + 1))?)?;
session.set(&Key::TxnActive(id).encode()?, serialize(&mode)?)?;

// We always take a new snapshot, even for snapshot transactions, because all transactions
// increment the transaction ID and we need to properly record currently active transactions
Expand All @@ -131,7 +174,7 @@ impl<E: Engine> Transaction<E> {
/// Resumes an active transaction with the given ID. Errors if the transaction is not active.
fn resume(engine: Arc<Mutex<E>>, id: u64) -> Result<Self> {
let mut session = engine.lock()?;
let mode = match session.get(&Key::TxnActive(id).encode())? {
let mode = match session.get(&Key::TxnActive(id).encode()?)? {
Some(v) => deserialize(&v)?,
None => return Err(Error::Value(format!("No active transaction {}", id))),
};
Expand All @@ -156,7 +199,7 @@ impl<E: Engine> Transaction<E> {
/// Commits the transaction, by removing the txn from the active set.
pub fn commit(self) -> Result<()> {
let mut session = self.engine.lock()?;
session.delete(&Key::TxnActive(self.id).encode())?;
session.delete(&Key::TxnActive(self.id).encode()?)?;
session.flush()
}

Expand All @@ -166,8 +209,8 @@ impl<E: Engine> Transaction<E> {
if self.mode.mutable() {
let mut rollback = Vec::new();
let mut scan = session.scan(
Key::TxnUpdate(self.id, vec![].into()).encode()
..Key::TxnUpdate(self.id + 1, vec![].into()).encode(),
Key::TxnUpdate(self.id, vec![].into()).encode()?
..Key::TxnUpdate(self.id + 1, vec![].into()).encode()?,
);
while let Some((key, _)) = scan.next().transpose()? {
match Key::decode(&key)? {
Expand All @@ -181,7 +224,7 @@ impl<E: Engine> Transaction<E> {
session.delete(&key)?;
}
}
session.delete(&Key::TxnActive(self.id).encode())
session.delete(&Key::TxnActive(self.id).encode()?)
}

/// Deletes a key.
Expand All @@ -193,7 +236,7 @@ impl<E: Engine> Transaction<E> {
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let mut session = self.engine.lock()?;
let mut scan = session
.scan(Key::Record(key.into(), 0).encode()..=Key::Record(key.into(), self.id).encode())
.scan(Key::Record(key.into(), 0).encode()?..=Key::Record(key.into(), self.id).encode()?)
.rev();
while let Some((k, v)) = scan.next().transpose()? {
match Key::decode(&k)? {
Expand All @@ -211,13 +254,13 @@ impl<E: Engine> Transaction<E> {
/// Scans a key range.
pub fn scan(&self, range: impl RangeBounds<Vec<u8>>) -> Result<ScanIterator> {
let start = match range.start_bound() {
Bound::Excluded(k) => Bound::Excluded(Key::Record(k.into(), std::u64::MAX).encode()),
Bound::Included(k) => Bound::Included(Key::Record(k.into(), 0).encode()),
Bound::Unbounded => Bound::Included(Key::Record(vec![].into(), 0).encode()),
Bound::Excluded(k) => Bound::Excluded(Key::Record(k.into(), std::u64::MAX).encode()?),
Bound::Included(k) => Bound::Included(Key::Record(k.into(), 0).encode()?),
Bound::Unbounded => Bound::Included(Key::Record(vec![].into(), 0).encode()?),
};
let end = match range.end_bound() {
Bound::Excluded(k) => Bound::Excluded(Key::Record(k.into(), 0).encode()),
Bound::Included(k) => Bound::Included(Key::Record(k.into(), std::u64::MAX).encode()),
Bound::Excluded(k) => Bound::Excluded(Key::Record(k.into(), 0).encode()?),
Bound::Included(k) => Bound::Included(Key::Record(k.into(), std::u64::MAX).encode()?),
Bound::Unbounded => Bound::Unbounded,
};
// TODO: For now, collect results from the engine to not have to deal with lifetimes.
Expand Down Expand Up @@ -266,8 +309,8 @@ impl<E: Engine> Transaction<E> {
let min = self.snapshot.invisible.iter().min().cloned().unwrap_or(self.id + 1);
let mut scan = session
.scan(
Key::Record(key.into(), min).encode()
..=Key::Record(key.into(), std::u64::MAX).encode(),
Key::Record(key.into(), min).encode()?
..=Key::Record(key.into(), std::u64::MAX).encode()?,
)
.rev();
while let Some((k, _)) = scan.next().transpose()? {
Expand All @@ -283,8 +326,8 @@ impl<E: Engine> Transaction<E> {
std::mem::drop(scan);

// Write the key and its update record.
let key = Key::Record(key.into(), self.id).encode();
let update = Key::TxnUpdate(self.id, (&key).into()).encode();
let key = Key::Record(key.into(), self.id).encode()?;
let update = Key::TxnUpdate(self.id, (&key).into()).encode()?;
session.set(&update, vec![])?;
session.set(&key, serialize(&value)?)
}
Expand Down Expand Up @@ -340,21 +383,21 @@ impl Snapshot {
/// Takes a new snapshot, persisting it as `Key::TxnSnapshot(version)`.
fn take<E: Engine>(session: &mut MutexGuard<E>, version: u64) -> Result<Self> {
let mut snapshot = Self { version, invisible: HashSet::new() };
let mut scan = session.scan(Key::TxnActive(0).encode()..Key::TxnActive(version).encode());
let mut scan = session.scan(Key::TxnActive(0).encode()?..Key::TxnActive(version).encode()?);
while let Some((key, _)) = scan.next().transpose()? {
match Key::decode(&key)? {
Key::TxnActive(id) => snapshot.invisible.insert(id),
k => return Err(Error::Internal(format!("Expected TxnActive, got {:?}", k))),
};
}
std::mem::drop(scan);
session.set(&Key::TxnSnapshot(version).encode(), serialize(&snapshot.invisible)?)?;
session.set(&Key::TxnSnapshot(version).encode()?, serialize(&snapshot.invisible)?)?;
Ok(snapshot)
}

/// Reengines an existing snapshot from `Key::TxnSnapshot(version)`, or errors if not found.
fn reengine<E: Engine>(session: &mut MutexGuard<E>, version: u64) -> Result<Self> {
match session.get(&Key::TxnSnapshot(version).encode())? {
match session.get(&Key::TxnSnapshot(version).encode()?)? {
Some(ref v) => Ok(Self { version, invisible: deserialize(v)? }),
None => Err(Error::Value(format!("Snapshot not found for version {}", version))),
}
Expand All @@ -366,62 +409,6 @@ impl Snapshot {
}
}

/// MVCC keys. The encoding preserves the grouping and ordering of keys. Uses a Cow since we want
/// to take borrows when encoding and return owned when decoding.
#[derive(Debug)]
enum Key<'a> {
/// The next available txn ID. Used when starting new txns.
TxnNext,
/// Active txn markers, containing the mode. Used to detect concurrent txns, and to resume.
TxnActive(u64),
/// Txn snapshot, containing concurrent active txns at start of txn.
TxnSnapshot(u64),
/// Update marker for a txn ID and key, used for rollback.
TxnUpdate(u64, Cow<'a, [u8]>),
/// A record for a key/version pair.
Record(Cow<'a, [u8]>, u64),
/// Arbitrary unversioned metadata.
Metadata(Cow<'a, [u8]>),
}

impl<'a> Key<'a> {
/// Encodes a key into a byte vector.
fn encode(self) -> Vec<u8> {
use encoding::*;
match self {
Self::TxnNext => vec![0x01],
Self::TxnActive(id) => [&[0x02][..], &encode_u64(id)].concat(),
Self::TxnSnapshot(version) => [&[0x03][..], &encode_u64(version)].concat(),
Self::TxnUpdate(id, key) => {
[&[0x04][..], &encode_u64(id), &encode_bytes(&key)].concat()
}
Self::Metadata(key) => [&[0x05][..], &encode_bytes(&key)].concat(),
Self::Record(key, version) => {
[&[0xff][..], &encode_bytes(&key), &encode_u64(version)].concat()
}
}
}

/// Decodes a key from a byte representation.
fn decode(mut bytes: &[u8]) -> Result<Self> {
use encoding::*;
let bytes = &mut bytes;
let key = match take_byte(bytes)? {
0x01 => Self::TxnNext,
0x02 => Self::TxnActive(take_u64(bytes)?),
0x03 => Self::TxnSnapshot(take_u64(bytes)?),
0x04 => Self::TxnUpdate(take_u64(bytes)?, take_bytes(bytes)?.into()),
0x05 => Self::Metadata(take_bytes(bytes)?.into()),
0xff => Self::Record(take_bytes(bytes)?.into(), take_u64(bytes)?),
b => return Err(Error::Internal(format!("Unknown MVCC key prefix {:x?}", b))),
};
if !bytes.is_empty() {
return Err(Error::Internal("Unexpected data remaining at end of key".into()));
}
Ok(key)
}
}

pub type ScanIterator<'a> =
Box<dyn DoubleEndedIterator<Item = Result<(Vec<u8>, Vec<u8>)>> + Send + 'a>;

Expand Down

0 comments on commit 722f766

Please sign in to comment.