diff --git a/pgx-tests/src/tests/mod.rs b/pgx-tests/src/tests/mod.rs index 5924c59c4..0e8674b47 100644 --- a/pgx-tests/src/tests/mod.rs +++ b/pgx-tests/src/tests/mod.rs @@ -40,6 +40,7 @@ mod shmem_tests; mod spi_tests; mod srf_tests; mod struct_type_tests; +mod subxact_tests; mod trigger_tests; mod uuid_tests; mod variadic_tests; diff --git a/pgx-tests/src/tests/subxact_tests.rs b/pgx-tests/src/tests/subxact_tests.rs new file mode 100644 index 000000000..1a600b61a --- /dev/null +++ b/pgx-tests/src/tests/subxact_tests.rs @@ -0,0 +1,103 @@ +/* +Portions Copyright 2019-2021 ZomboDB, LLC. +Portions Copyright 2021-2022 Technology Concepts & Design, Inc. + +All rights reserved. + +Use of this source code is governed by the MIT license that can be found in the LICENSE file. +*/ + +#[cfg(any(test, feature = "pg_test"))] +#[pgx::pg_schema] +mod tests { + #[allow(unused_imports)] + use crate as pgx_tests; + + use pgx::prelude::*; + use pgx::SpiClient; + + #[pg_test] + fn test_subxact_smoketest() { + Spi::execute(|c| { + c.update("CREATE TABLE a (v INTEGER)", None, None); + let c = c.sub_transaction(|xact| { + xact.update("INSERT INTO a VALUES (0)", None, None); + assert_eq!( + 0, + xact.select("SELECT v FROM a", Some(1), None) + .first() + .get_datum::(1) + .unwrap() + ); + let xact = xact.sub_transaction(|xact| { + xact.update("INSERT INTO a VALUES (1)", None, None); + assert_eq!( + 2, + xact.select("SELECT COUNT(*) FROM a", Some(1), None) + .first() + .get_datum::(1) + .unwrap() + ); + xact.rollback() + }); + xact.rollback() + }); + assert_eq!( + 0, + c.select("SELECT COUNT(*) FROM a", Some(1), None) + .first() + .get_datum::(1) + .unwrap() + ); + }) + } + + #[pg_test] + fn test_commit_on_drop() { + Spi::execute(|c| { + c.update("CREATE TABLE a (v INTEGER)", None, None); + // The type below is explicit to ensure it's commit on drop by default + c.sub_transaction(|xact: SubTransaction| { + xact.update("INSERT INTO a VALUES (0)", None, None); + // Dropped explicitly for illustration purposes + drop(xact); + }); + // Create a new client to check the state + Spi::execute(|c| { + // The above insert should have been committed + assert_eq!( + 1, + c.select("SELECT COUNT(*) FROM a", Some(1), None) + .first() + .get_datum::(1) + .unwrap() + ); + }); + }) + } + + #[pg_test] + fn test_rollback_on_drop() { + Spi::execute(|c| { + c.update("CREATE TABLE a (v INTEGER)", None, None); + // The type below is explicit to ensure it's commit on drop by default + c.sub_transaction(|xact: SubTransaction| { + xact.update("INSERT INTO a VALUES (0)", None, None); + let xact = xact.rollback_on_drop(); + // Dropped explicitly for illustration purposes + drop(xact); + }); + // Create a new client to check the state + Spi::execute(|c| { + // The above insert should NOT have been committed + assert_eq!( + 0, + c.select("SELECT COUNT(*) FROM a", Some(1), None) + .first() + .get_datum::(1) + .unwrap() + ); + }); + }) + } +} diff --git a/pgx/src/lib.rs b/pgx/src/lib.rs index f572bd4b7..84b431a0c 100644 --- a/pgx/src/lib.rs +++ b/pgx/src/lib.rs @@ -63,6 +63,7 @@ pub mod shmem; pub mod spi; pub mod spinlock; pub mod stringinfo; +pub mod subxact; pub mod trigger_support; pub mod tupdesc; pub mod varlena; diff --git a/pgx/src/prelude.rs b/pgx/src/prelude.rs index ae74f8e3d..76c8f9c65 100644 --- a/pgx/src/prelude.rs +++ b/pgx/src/prelude.rs @@ -36,3 +36,6 @@ pub use crate::pg_sys::{ check_for_interrupts, debug1, debug2, debug3, debug4, debug5, ereport, error, function_name, info, log, notice, warning, FATAL, PANIC, }; + +// Sub-transactions +pub use crate::subxact::*; diff --git a/pgx/src/subxact.rs b/pgx/src/subxact.rs new file mode 100644 index 000000000..ef91be77c --- /dev/null +++ b/pgx/src/subxact.rs @@ -0,0 +1,196 @@ +use crate::{pg_sys, PgMemoryContexts, SpiClient}; +use std::fmt::{Debug, Formatter}; +use std::ops::Deref; + +/// Sub-transaction +/// +/// Can be created by calling `SpiClient::sub_transaction`, `SubTransaction::sub_transaction` +/// or any other implementation of `SubTransactionExt` and obtaining it as an argument to the provided closure. +/// +/// Unless rolled back or committed explicitly, it'll commit if `COMMIT` generic parameter is `true` +/// (default) or roll back if it is `false`. +pub struct SubTransaction { + memory_context: pg_sys::MemoryContext, + resource_owner: pg_sys::ResourceOwner, + // Should the transaction be released, or was it already committed or rolled back? + // + // The reason we are not calling this `released` as we're also using this flag when + // we convert between commit_on_drop and rollback_on_drop to ensure it doesn't get released + // on the drop of the original value. + should_release: bool, + parent: Option, +} + +impl Debug for SubTransaction { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(std::any::type_name::()) + } +} + +impl SubTransaction { + /// Create a new sub-transaction. + fn new(parent: Parent) -> Self { + // Remember the memory context before starting the sub-transaction + let ctx = PgMemoryContexts::CurrentMemoryContext.value(); + // Remember resource owner before starting the sub-transaction + let resource_owner = unsafe { pg_sys::CurrentResourceOwner }; + unsafe { + pg_sys::BeginInternalSubTransaction(std::ptr::null()); + } + // Switch to the outer memory context so that all allocations remain + // there instead of the sub-transaction's context + PgMemoryContexts::For(ctx).set_as_current(); + Self { memory_context: ctx, should_release: true, resource_owner, parent: Some(parent) } + } + + /// Commit the transaction, returning its parent + pub fn commit(mut self) -> Parent { + self.internal_commit(); + self.should_release = false; + self.parent.take().unwrap() + } + + /// Rollback the transaction, returning its parent + pub fn rollback(mut self) -> Parent { + self.internal_rollback(); + self.should_release = false; + self.parent.take().unwrap() + } + + /// Returns the memory context this transaction is in + pub fn memory_context(&self) -> PgMemoryContexts { + PgMemoryContexts::For(self.memory_context) + } + + fn internal_rollback(&self) { + unsafe { + pg_sys::RollbackAndReleaseCurrentSubTransaction(); + pg_sys::CurrentResourceOwner = self.resource_owner; + } + PgMemoryContexts::For(self.memory_context).set_as_current(); + } + + fn internal_commit(&self) { + unsafe { + pg_sys::ReleaseCurrentSubTransaction(); + pg_sys::CurrentResourceOwner = self.resource_owner; + } + PgMemoryContexts::For(self.memory_context).set_as_current(); + } +} + +impl SubTransaction { + /// Make this sub-transaction roll back on drop + pub fn rollback_on_drop(self) -> SubTransaction { + self.into() + } +} + +impl SubTransaction { + /// Make this sub-transaction commit on drop + pub fn commit_on_drop(self) -> SubTransaction { + self.into() + } +} + +impl Into> + for SubTransaction +{ + fn into(mut self) -> SubTransaction { + let result = SubTransaction { + memory_context: self.memory_context, + resource_owner: self.resource_owner, + should_release: self.should_release, + parent: self.parent.take(), + }; + // Make sure original sub-transaction won't commit + self.should_release = false; + result + } +} + +impl Into> + for SubTransaction +{ + fn into(mut self) -> SubTransaction { + let result = SubTransaction { + memory_context: self.memory_context, + resource_owner: self.resource_owner, + should_release: self.should_release, + parent: self.parent.take(), + }; + // Make sure original sub-transaction won't roll back + self.should_release = false; + result + } +} + +impl Drop for SubTransaction { + fn drop(&mut self) { + if self.should_release { + if COMMIT { + self.internal_commit(); + } else { + self.internal_rollback(); + } + } + } +} + +// This allows SubTransaction to be de-referenced to SpiClient +impl<'conn, const COMMIT: bool> Deref for SubTransaction, COMMIT> { + type Target = SpiClient<'conn>; + + fn deref(&self) -> &Self::Target { + self.parent.as_ref().unwrap() + } +} + +// This allows a SubTransaction of a SubTransaction to be de-referenced to SpiClient +impl Deref + for SubTransaction, COMMIT> +{ + type Target = Parent; + + fn deref(&self) -> &Self::Target { + self.parent.as_ref().and_then(|p| p.parent.as_ref()).unwrap() + } +} + +/// Trait that allows creating a sub-transaction off any type +pub trait SubTransactionExt { + /// Parent's type + /// + /// In most common cases, it'll be equal to `Self`. However, in some cases + /// it may be desirable to use a different type to achieve certain goals. + type T: SubTransactionExt; + + /// Consume `self` and execute a closure with a sub-transaction + /// + /// If further use of the given sub-transaction is necessary, it must + /// be returned by the closure alongside with its intended result. Otherwise, + /// the sub-transaction be released when dropped. + fn sub_transaction) -> R, R>(self, f: F) -> R + where + Self: Sized; +} + +impl<'a> SubTransactionExt for SpiClient<'a> { + type T = Self; + fn sub_transaction) -> R, R>(self, f: F) -> R + where + Self: Sized, + { + f(SubTransaction::new(self)) + } +} + +impl SubTransactionExt for SubTransaction { + type T = Self; + fn sub_transaction) -> R, R>(self, f: F) -> R + where + Self: Sized, + { + f(SubTransaction::new(self)) + } +}