Skip to content

Commit

Permalink
Redesign how read-only transactions work.
Browse files Browse the repository at this point in the history
This patch redesigns how read-only transactions work. Previously,
read-only transactions would allocate a new transaction ID and store the
active set, just like read-write transactions. This was necessary to
resume the transaction after traversing the Raft boundary.

Now that the transaction state can be serialized and passed across Raft,
this is no longer necessary, making read-only transaction truly
read-only at the MVCC level. New versions are only allocated for
read-write transactions (since these are the only ones that can mutate
the MVCC state), and only these store their active set.
  • Loading branch information
erikgrinaker committed Sep 5, 2023
1 parent 629ffde commit dc192a0
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 342 deletions.
20 changes: 7 additions & 13 deletions src/bin/toysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use rustyline::validate::{ValidationContext, ValidationResult, Validator};
use rustyline::{error::ReadlineError, Editor, Modifiers};
use rustyline_derive::{Completer, Helper, Highlighter, Hinter};
use toydb::error::{Error, Result};
use toydb::sql::engine::Mode;
use toydb::sql::execution::ResultSet;
use toydb::sql::parser::{Lexer, Token};
use toydb::Client;
Expand Down Expand Up @@ -165,16 +164,12 @@ SQL txns: {txns_active} active, {txns} total ({sql_storage} storage)
/// Runs a query and displays the results
async fn execute_query(&mut self, query: &str) -> Result<()> {
match self.client.execute(query).await? {
ResultSet::Begin { id, mode } => match mode {
Mode::ReadWrite => println!("Began transaction {}", id),
Mode::ReadOnly => println!("Began read-only transaction {}", id),
Mode::Snapshot { version, .. } => println!(
"Began read-only transaction {} in snapshot at version {}",
id, version
),
ResultSet::Begin { version, read_only } => match read_only {
true => println!("Began transaction at new version {}", version),
false => println!("Began read-only transaction at version {}", version),
},
ResultSet::Commit { id } => println!("Committed transaction {}", id),
ResultSet::Rollback { id } => println!("Rolled back transaction {}", id),
ResultSet::Commit { version: id } => println!("Committed transaction {}", id),
ResultSet::Rollback { version: id } => println!("Rolled back transaction {}", id),
ResultSet::Create { count } => println!("Created {} rows", count),
ResultSet::Delete { count } => println!("Deleted {} rows", count),
ResultSet::Update { count } => println!("Updated {} rows", count),
Expand Down Expand Up @@ -206,9 +201,8 @@ SQL txns: {txns_active} active, {txns} total ({sql_storage} storage)
/// Prompts the user for input
fn prompt(&mut self) -> Result<Option<String>> {
let prompt = match self.client.txn() {
Some((id, Mode::ReadWrite)) => format!("toydb:{}> ", id),
Some((id, Mode::ReadOnly)) => format!("toydb:{}> ", id),
Some((_, Mode::Snapshot { version })) => format!("toydb@{}> ", version),
Some((version, false)) => format!("toydb:{}> ", version),
Some((version, true)) => format!("toydb@{}> ", version),
None => "toydb> ".into(),
};
match self.editor.readline(&prompt) {
Expand Down
10 changes: 5 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::{Error, Result};
use crate::server::{Request, Response};
use crate::sql::engine::{Mode, Status};
use crate::sql::engine::Status;
use crate::sql::execution::ResultSet;
use crate::sql::schema::Table;

Expand Down Expand Up @@ -30,7 +30,7 @@ const WITH_TXN_RETRIES: u8 = 8;
#[derive(Clone)]
pub struct Client {
conn: Arc<Mutex<Connection>>,
txn: Cell<Option<(u64, Mode)>>,
txn: Cell<Option<(u64, bool)>>,
}

impl Client {
Expand Down Expand Up @@ -87,7 +87,7 @@ impl Client {
resultset = ResultSet::Query { columns, rows: Box::new(rows.into_iter().map(Ok)) }
};
match &resultset {
ResultSet::Begin { id, mode } => self.txn.set(Some((*id, *mode))),
ResultSet::Begin { version, read_only } => self.txn.set(Some((*version, *read_only))),
ResultSet::Commit { .. } => self.txn.set(None),
ResultSet::Rollback { .. } => self.txn.set(None),
_ => {}
Expand Down Expand Up @@ -119,8 +119,8 @@ impl Client {
}
}

/// Returns the transaction status of the client
pub fn txn(&self) -> Option<(u64, Mode)> {
/// Returns the version and read-only state of the txn
pub fn txn(&self) -> Option<(u64, bool)> {
self.txn.get()
}

Expand Down
14 changes: 6 additions & 8 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::error::{Error, Result};
use crate::raft;
use crate::sql;
use crate::sql::engine::{Engine as _, Mode};
use crate::sql::engine::Engine as _;
use crate::sql::execution::ResultSet;
use crate::sql::schema::{Catalog as _, Table};
use crate::sql::types::Row;
Expand Down Expand Up @@ -155,14 +155,12 @@ impl Session {
debug!("Processing request {:?}", request);
let response = match request {
Request::Execute(query) => Response::Execute(self.sql.execute(&query)?),
Request::GetTable(table) => Response::GetTable(
self.sql.with_txn(Mode::ReadOnly, |txn| txn.must_read_table(&table))?,
),
Request::ListTables => {
Response::ListTables(self.sql.with_txn(Mode::ReadOnly, |txn| {
Ok(txn.scan_tables()?.map(|t| t.name).collect())
})?)
Request::GetTable(table) => {
Response::GetTable(self.sql.read_with_txn(|txn| txn.must_read_table(&table))?)
}
Request::ListTables => Response::ListTables(
self.sql.read_with_txn(|txn| Ok(txn.scan_tables()?.map(|t| t.name).collect()))?,
),
Request::Status => Response::Status(self.engine.status()?),
};
debug!("Returning response {:?}", response);
Expand Down
20 changes: 14 additions & 6 deletions src/sql/engine/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,16 @@ impl<E: storage::Engine> KV<E> {
impl<E: storage::Engine> super::Engine for KV<E> {
type Transaction = Transaction<E>;

fn begin(&self, mode: super::Mode) -> Result<Self::Transaction> {
Ok(Self::Transaction::new(self.kv.begin_with_mode(mode)?))
fn begin(&self) -> Result<Self::Transaction> {
Ok(Self::Transaction::new(self.kv.begin()?))
}

fn begin_read_only(&self) -> Result<Self::Transaction> {
Ok(Self::Transaction::new(self.kv.begin_read_only()?))
}

fn begin_as_of(&self, version: u64) -> Result<Self::Transaction> {
Ok(Self::Transaction::new(self.kv.begin_as_of(version)?))
}
}

Expand Down Expand Up @@ -109,12 +117,12 @@ impl<E: storage::Engine> Transaction<E> {
}

impl<E: storage::Engine> super::Transaction for Transaction<E> {
fn id(&self) -> u64 {
self.txn.id()
fn version(&self) -> u64 {
self.txn.version()
}

fn mode(&self) -> super::Mode {
self.txn.mode()
fn read_only(&self) -> bool {
self.txn.read_only()
}

fn commit(self) -> Result<()> {
Expand Down
70 changes: 36 additions & 34 deletions src/sql/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ pub trait Engine: Clone {
/// The transaction type
type Transaction: Transaction;

/// Begins a transaction in the given mode
fn begin(&self, mode: Mode) -> Result<Self::Transaction>;
/// Begins a read-write transaction.
fn begin(&self) -> Result<Self::Transaction>;

/// Begins a read-only transaction.
fn begin_read_only(&self) -> Result<Self::Transaction>;

/// Begins a read-only transaction as of a historical version.
fn begin_as_of(&self, version: u64) -> Result<Self::Transaction>;

/// Begins a session for executing individual statements
fn session(&self) -> Result<Session<Self>> {
Expand All @@ -29,10 +35,11 @@ pub trait Engine: Clone {

/// An SQL transaction
pub trait Transaction: Catalog {
/// The transaction ID
fn id(&self) -> u64;
/// The transaction mode
fn mode(&self) -> Mode;
/// The transaction's version
fn version(&self) -> u64;
/// Whether the transaction is read-only
fn read_only(&self) -> bool;

/// Commits the transaction
fn commit(self) -> Result<()>;
/// Rolls back the transaction
Expand Down Expand Up @@ -72,24 +79,24 @@ impl<E: Engine + 'static> Session<E> {
ast::Statement::Begin { .. } if self.txn.is_some() => {
Err(Error::Value("Already in a transaction".into()))
}
ast::Statement::Begin { readonly: true, version: None } => {
let txn = self.engine.begin(Mode::ReadOnly)?;
let result = ResultSet::Begin { id: txn.id(), mode: txn.mode() };
ast::Statement::Begin { read_only: true, as_of: None } => {
let txn = self.engine.begin_read_only()?;
let result = ResultSet::Begin { version: txn.version(), read_only: true };
self.txn = Some(txn);
Ok(result)
}
ast::Statement::Begin { readonly: true, version: Some(version) } => {
let txn = self.engine.begin(Mode::Snapshot { version })?;
let result = ResultSet::Begin { id: txn.id(), mode: txn.mode() };
ast::Statement::Begin { read_only: true, as_of: Some(version) } => {
let txn = self.engine.begin_as_of(version)?;
let result = ResultSet::Begin { version, read_only: true };
self.txn = Some(txn);
Ok(result)
}
ast::Statement::Begin { readonly: false, version: Some(_) } => {
ast::Statement::Begin { read_only: false, as_of: Some(_) } => {
Err(Error::Value("Can't start read-write transaction in a given version".into()))
}
ast::Statement::Begin { readonly: false, version: None } => {
let txn = self.engine.begin(Mode::ReadWrite)?;
let result = ResultSet::Begin { id: txn.id(), mode: txn.mode() };
ast::Statement::Begin { read_only: false, as_of: None } => {
let txn = self.engine.begin()?;
let result = ResultSet::Begin { version: txn.version(), read_only: false };
self.txn = Some(txn);
Ok(result)
}
Expand All @@ -98,31 +105,31 @@ impl<E: Engine + 'static> Session<E> {
}
ast::Statement::Commit => {
let txn = self.txn.take().unwrap();
let id = txn.id();
let version = txn.version();
txn.commit()?;
Ok(ResultSet::Commit { id })
Ok(ResultSet::Commit { version })
}
ast::Statement::Rollback => {
let txn = self.txn.take().unwrap();
let id = txn.id();
let version = txn.version();
txn.rollback()?;
Ok(ResultSet::Rollback { id })
Ok(ResultSet::Rollback { version })
}
ast::Statement::Explain(statement) => self.with_txn(Mode::ReadOnly, |txn| {
ast::Statement::Explain(statement) => self.read_with_txn(|txn| {
Ok(ResultSet::Explain(Plan::build(*statement, txn)?.optimize(txn)?.0))
}),
statement if self.txn.is_some() => Plan::build(statement, self.txn.as_mut().unwrap())?
.optimize(self.txn.as_mut().unwrap())?
.execute(self.txn.as_mut().unwrap()),
statement @ ast::Statement::Select { .. } => {
let mut txn = self.engine.begin(Mode::ReadOnly)?;
let mut txn = self.engine.begin_read_only()?;
let result =
Plan::build(statement, &mut txn)?.optimize(&mut txn)?.execute(&mut txn);
txn.rollback()?;
result
}
statement => {
let mut txn = self.engine.begin(Mode::ReadWrite)?;
let mut txn = self.engine.begin()?;
match Plan::build(statement, &mut txn)?.optimize(&mut txn)?.execute(&mut txn) {
Ok(result) => {
txn.commit()?;
Expand All @@ -137,29 +144,24 @@ impl<E: Engine + 'static> Session<E> {
}
}

/// Runs a closure in the session's transaction, or a new transaction if none is active.
pub fn with_txn<R, F>(&mut self, mode: Mode, f: F) -> Result<R>
/// Runs a read-only closure in the session's transaction, or a new
/// transaction if none is active.
///
/// TODO: reconsider this
pub fn read_with_txn<R, F>(&mut self, f: F) -> Result<R>
where
F: FnOnce(&mut E::Transaction) -> Result<R>,
{
if let Some(ref mut txn) = self.txn {
if !txn.mode().satisfies(&mode) {
return Err(Error::Value(
"The operation cannot run in the current transaction".into(),
));
}
return f(txn);
}
let mut txn = self.engine.begin(mode)?;
let mut txn = self.engine.begin_read_only()?;
let result = f(&mut txn);
txn.rollback()?;
result
}
}

/// The transaction mode
pub type Mode = crate::storage::mvcc::Mode;

/// A row scan iterator
pub type Scan = Box<dyn DoubleEndedIterator<Item = Result<Row>> + Send>;

Expand Down
40 changes: 27 additions & 13 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::super::schema::{Catalog, Table, Tables};
use super::super::types::{Expression, Row, Value};
use super::{Engine as _, IndexScan, Mode, Scan, Transaction as _};
use super::{Engine as _, IndexScan, Scan, Transaction as _};
use crate::error::{Error, Result};
use crate::raft;
use crate::storage::{self, mvcc::TransactionState};
Expand All @@ -13,8 +13,8 @@ use std::collections::HashSet;
/// TODO: use Cows for these.
#[derive(Clone, Serialize, Deserialize)]
enum Mutation {
/// Begins a transaction in the given mode
Begin(Mode),
/// Begins a transaction
Begin { read_only: bool, as_of: Option<u64> },
/// Commits the given transaction
Commit(TransactionState),
/// Rolls back the given transaction
Expand Down Expand Up @@ -104,8 +104,16 @@ impl Raft {
impl super::Engine for Raft {
type Transaction = Transaction;

fn begin(&self, mode: Mode) -> Result<Self::Transaction> {
Transaction::begin(self.client.clone(), mode)
fn begin(&self) -> Result<Self::Transaction> {
Transaction::begin(self.client.clone(), false, None)
}

fn begin_read_only(&self) -> Result<Self::Transaction> {
Transaction::begin(self.client.clone(), true, None)
}

fn begin_as_of(&self, version: u64) -> Result<Self::Transaction> {
Transaction::begin(self.client.clone(), true, Some(version))
}
}

Expand All @@ -120,9 +128,9 @@ pub struct Transaction {

impl Transaction {
/// Starts a transaction in the given mode
fn begin(client: raft::Client, mode: Mode) -> Result<Self> {
fn begin(client: raft::Client, read_only: bool, as_of: Option<u64>) -> Result<Self> {
let state = Raft::deserialize(&futures::executor::block_on(
client.mutate(Raft::serialize(&Mutation::Begin(mode))?),
client.mutate(Raft::serialize(&Mutation::Begin { read_only, as_of })?),
)?)?;
Ok(Self { client, state })
}
Expand All @@ -139,12 +147,12 @@ impl Transaction {
}

impl super::Transaction for Transaction {
fn id(&self) -> u64 {
self.state.id
fn version(&self) -> u64 {
self.state.version
}

fn mode(&self) -> Mode {
self.state.mode
fn read_only(&self) -> bool {
self.state.read_only
}

fn commit(self) -> Result<()> {
Expand Down Expand Up @@ -276,8 +284,14 @@ impl<E: storage::Engine> State<E> {
/// Applies a state machine mutation
fn apply(&mut self, mutation: Mutation) -> Result<Vec<u8>> {
match mutation {
Mutation::Begin(mode) => {
let txn = self.engine.begin(mode)?;
Mutation::Begin { read_only, as_of } => {
let txn = if !read_only {
self.engine.begin()?
} else if let Some(version) = as_of {
self.engine.begin_as_of(version)?
} else {
self.engine.begin_read_only()?
};
Raft::serialize(&txn.state())
}
Mutation::Commit(txn) => Raft::serialize(&self.engine.resume(txn)?.commit()?),
Expand Down
Loading

0 comments on commit dc192a0

Please sign in to comment.