diff --git a/pgx-tests/src/tests/mod.rs b/pgx-tests/src/tests/mod.rs index 881c0491a..fd1f6a239 100644 --- a/pgx-tests/src/tests/mod.rs +++ b/pgx-tests/src/tests/mod.rs @@ -45,6 +45,7 @@ mod shmem_tests; mod spi_tests; mod srf_tests; mod struct_type_tests; +mod subxact_tests; mod trigger_tests; mod uuid_tests; mod variadic_tests; diff --git a/pgx-tests/src/tests/subxact_tests.rs b/pgx-tests/src/tests/subxact_tests.rs new file mode 100644 index 000000000..5d5818546 --- /dev/null +++ b/pgx-tests/src/tests/subxact_tests.rs @@ -0,0 +1,114 @@ +/* +Portions Copyright 2019-2021 ZomboDB, LLC. +Portions Copyright 2021-2022 Technology Concepts & Design, Inc. + +All rights reserved. + +Use of this source code is governed by the MIT license that can be found in the LICENSE file. +*/ + +#[cfg(any(test, feature = "pg_test"))] +#[pgx::pg_schema] +mod tests { + #[allow(unused_imports)] + use crate as pgx_tests; + + use pgx::prelude::*; + use pgx::spi::SpiClient; + use pgx::subxact::CommitOnDrop; + + #[pg_test] + fn test_subxact_smoketest() { + Spi::connect(|mut c| { + c.update("CREATE TABLE a (v INTEGER)", None, None).unwrap(); + let c = c.sub_transaction(|mut xact| { + xact.update("INSERT INTO a VALUES (0)", None, None).unwrap(); + assert_eq!( + 0, + xact.select("SELECT v FROM a", Some(1), None) + .unwrap() + .first() + .get::(1) + .unwrap() + .unwrap() + ); + let xact = xact.sub_transaction(|mut xact| { + xact.update("INSERT INTO a VALUES (1)", None, None).unwrap(); + assert_eq!( + 2, + xact.select("SELECT COUNT(*) FROM a", Some(1), None) + .unwrap() + .first() + .get::(1) + .unwrap() + .unwrap() + ); + xact.rollback() + }); + xact.rollback() + }); + assert_eq!( + 0, + c.select("SELECT COUNT(*) FROM a", Some(1), None) + .unwrap() + .first() + .get::(1) + .unwrap() + .unwrap() + ); + }) + } + + #[pg_test] + fn test_commit_on_drop() { + Spi::connect(|mut c| { + c.update("CREATE TABLE a (v INTEGER)", None, None).unwrap(); + // The type below is explicit to ensure it's commit on drop by default + c.sub_transaction(|mut xact: SubTransaction| { + xact.update("INSERT INTO a VALUES (0)", None, None).unwrap(); + // Dropped explicitly for illustration purposes + drop(xact); + }); + // Create a new client to check the state + Spi::connect(|c| { + // The above insert should have been committed + assert_eq!( + 1, + c.select("SELECT COUNT(*) FROM a", Some(1), None) + .unwrap() + .first() + .get::(1) + .unwrap() + .unwrap() + ); + }); + }) + } + + #[pg_test] + fn test_rollback_on_drop() { + Spi::connect(|mut c| { + c.update("CREATE TABLE a (v INTEGER)", None, None).unwrap(); + // The type below is explicit to ensure it's commit on drop by default + c.sub_transaction(|mut xact: SubTransaction| { + xact.update("INSERT INTO a VALUES (0)", None, None).unwrap(); + let xact = xact.rollback_on_drop(); + // Dropped explicitly for illustration purposes + drop(xact); + }); + // Create a new client to check the state + Spi::connect(|c| { + // The above insert should NOT have been committed + assert_eq!( + 0, + c.select("SELECT COUNT(*) FROM a", Some(1), None) + .unwrap() + .first() + .get::(1) + .unwrap() + .unwrap() + ); + }); + }) + } +} diff --git a/pgx/src/lib.rs b/pgx/src/lib.rs index d9fe16b79..ef8edfb61 100644 --- a/pgx/src/lib.rs +++ b/pgx/src/lib.rs @@ -68,6 +68,7 @@ pub mod spi; pub mod spinlock; pub mod srf; pub mod stringinfo; +pub mod subxact; pub mod trigger_support; pub mod tupdesc; pub mod varlena; diff --git a/pgx/src/prelude.rs b/pgx/src/prelude.rs index 32d04047c..ff71d840b 100644 --- a/pgx/src/prelude.rs +++ b/pgx/src/prelude.rs @@ -48,3 +48,6 @@ pub use crate::pg_sys::{ check_for_interrupts, debug1, debug2, debug3, debug4, debug5, ereport, error, function_name, info, log, notice, warning, FATAL, PANIC, }; + +// Sub-transactions +pub use crate::subxact::{SubTransaction, SubTransactionExt}; diff --git a/pgx/src/subxact.rs b/pgx/src/subxact.rs new file mode 100644 index 000000000..46542789d --- /dev/null +++ b/pgx/src/subxact.rs @@ -0,0 +1,254 @@ +use crate::{pg_sys, spi::SpiClient, PgMemoryContexts}; +use std::fmt::Debug; +use std::ops::{Deref, DerefMut}; + +/// Releases a sub-transaction on Drop +pub trait ReleaseOnDrop {} + +/// Sub-transaction's contextual information +#[derive(Clone, Copy)] +pub struct Context { + memory_context: pg_sys::MemoryContext, + // Resource ownership before the transaction + // + // Based on information from src/backend/utils/resowner/README + // as well as practical use of it in src/pl/plpython/plpy_spi.c + resource_owner: pg_sys::ResourceOwner, +} + +impl Context { + /// Captures the context + fn capture() -> Self { + // Remember the memory context before starting the sub-transaction + let memory_context = PgMemoryContexts::CurrentMemoryContext.value(); + // Remember resource owner before starting the sub-transaction + let resource_owner = unsafe { pg_sys::CurrentResourceOwner }; + Self { memory_context, resource_owner } + } +} + +impl From for CommitOnDrop { + fn from(context: Context) -> Self { + CommitOnDrop(context) + } +} + +impl From for RollbackOnDrop { + fn from(context: Context) -> Self { + RollbackOnDrop(context) + } +} + +/// Commits a sub-transaction on Drop +pub struct CommitOnDrop(Context); + +impl Drop for CommitOnDrop { + fn drop(&mut self) { + unsafe { + pg_sys::ReleaseCurrentSubTransaction(); + pg_sys::CurrentResourceOwner = self.0.resource_owner; + } + PgMemoryContexts::For(self.0.memory_context).set_as_current(); + } +} + +impl ReleaseOnDrop for CommitOnDrop {} + +/// Rolls back a sub-transaction on Drop +pub struct RollbackOnDrop(Context); + +impl Drop for RollbackOnDrop { + fn drop(&mut self) { + unsafe { + pg_sys::RollbackAndReleaseCurrentSubTransaction(); + pg_sys::CurrentResourceOwner = self.0.resource_owner; + } + PgMemoryContexts::For(self.0.memory_context).set_as_current(); + } +} + +impl ReleaseOnDrop for RollbackOnDrop {} + +impl Into for CommitOnDrop { + fn into(self) -> RollbackOnDrop { + let result = RollbackOnDrop(self.0); + // IMPORTANT: avoid running Drop (that would commit) + std::mem::forget(self); + result + } +} + +impl Into for RollbackOnDrop { + fn into(self) -> CommitOnDrop { + let result = CommitOnDrop(self.0); + // IMPORTANT: avoid running Drop (that would roll back) + std::mem::forget(self); + result + } +} + +struct NoOpOnDrop; + +impl ReleaseOnDrop for NoOpOnDrop {} + +/// Sub-transaction +/// +/// Can be created by calling `SpiClient::sub_transaction`, `SubTransaction::sub_transaction` +/// or any other implementation of `SubTransactionExt` and obtaining it as an argument to the provided closure. +/// +/// Unless rolled back or committed explicitly, it'll commit if `Release` generic parameter is `CommitOnDrop` +/// (default) or roll back if it is `RollbackOnDrop`. +#[derive(Debug)] +pub struct SubTransaction { + // Transaction release mechanism (commit, drop) + release: Release, + // Transaction parent + parent: Parent, +} + +impl SubTransaction +where + Release: From, +{ + /// Create a new sub-transaction. + fn new(parent: Parent) -> Self { + let context = Context::capture(); + let memory_context = context.memory_context; + let release = context.into(); + unsafe { + pg_sys::BeginInternalSubTransaction(std::ptr::null() /* [no] transaction name */); + } + // Switch to the outer memory context so that all allocations remain + // there instead of the sub-transaction's context + PgMemoryContexts::For(memory_context).set_as_current(); + Self { release, parent } + } +} + +impl SubTransaction { + /// Commit the transaction, returning its parent + pub fn commit(self) -> Parent { + // `Self::do_nothing_on_drop()` will commit as `Release` is `CommitOnDrop` + self.do_nothing_on_drop().parent + } +} + +impl SubTransaction { + /// Commit the transaction, returning its parent + pub fn commit(self) -> Parent { + // Make sub-transaction commit on drop and then use `commit` + self.commit_on_drop().commit() + } +} + +impl SubTransaction { + /// Rollback the transaction, returning its parent + pub fn rollback(self) -> Parent { + // `Self::do_nothing_on_drop()` will roll back as `Release` is `RollbackOnDrop` + self.do_nothing_on_drop().parent + } +} + +impl SubTransaction { + /// Rollback the transaction, returning its parent + pub fn rollback(self) -> Parent { + // Make sub-transaction roll back on drop and then use `rollback` + self.rollback_on_drop().rollback() + } +} + +impl SubTransaction { + /// Make this sub-transaction roll back on drop + pub fn rollback_on_drop(self) -> SubTransaction { + SubTransaction { parent: self.parent, release: self.release.into() } + } +} + +impl SubTransaction { + /// Make this sub-transaction commit on drop + pub fn commit_on_drop(self) -> SubTransaction { + SubTransaction { parent: self.parent, release: self.release.into() } + } +} + +impl SubTransaction { + /// Make this sub-transaction do nothing on drop + /// + /// Releases the sub-transaction based on `Release` generic parameter. Further + /// dropping of the sub-transaction will not do anything. + fn do_nothing_on_drop(self) -> SubTransaction { + SubTransaction { parent: self.parent, release: NoOpOnDrop } + } +} + +// This allows SubTransaction to be de-referenced to SpiClient +impl<'conn, Release: ReleaseOnDrop> Deref for SubTransaction, Release> { + type Target = SpiClient<'conn>; + + fn deref(&self) -> &Self::Target { + &self.parent + } +} + +impl<'conn, Release: ReleaseOnDrop> DerefMut for SubTransaction, Release> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.parent + } +} + +// This allows a SubTransaction of a SubTransaction to be de-referenced to SpiClient +impl Deref + for SubTransaction, Release> +{ + type Target = Parent; + + fn deref(&self) -> &Self::Target { + &self.parent.parent + } +} + +impl DerefMut + for SubTransaction, Release> +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.parent.parent + } +} + +/// Trait that allows creating a sub-transaction off any type +pub trait SubTransactionExt { + /// Parent's type + /// + /// In most common cases, it'll be equal to `Self`. However, in some cases + /// it may be desirable to use a different type to achieve certain goals. + type Parent: SubTransactionExt; + + /// Consume `self` and execute a closure with a sub-transaction + /// + /// If further use of the given sub-transaction is necessary, it must + /// be returned by the closure alongside with its intended result. Otherwise, + /// the sub-transaction be released when dropped. + fn sub_transaction) -> R, R>(self, f: F) -> R + where + Self: Sized; +} + +impl<'a> SubTransactionExt for SpiClient<'a> { + type Parent = Self; + fn sub_transaction) -> R, R>(self, f: F) -> R + where + Self: Sized, + { + f(SubTransaction::new(self)) + } +} + +impl SubTransactionExt for SubTransaction { + type Parent = Self; + fn sub_transaction) -> R, R>(self, f: F) -> R + where + Self: Sized, + { + f(SubTransaction::new(self)) + } +}