Skip to content

Commit

Permalink
Problem: inability to fine-tune transaction boundaries easily
Browse files Browse the repository at this point in the history
Solution: implement an interface for sub-transactions

It's a fairly simplistic interface at this point and it allows choosing
a mode of release on drop (commit by default).

This is an improved version of the original implementation in
pgx-contrib-spiext
(https://github.com/supabase/pgx-contrib-spiext/blob/main/src/subtxn.rs)
  • Loading branch information
yrashk committed Dec 5, 2022
1 parent e460d80 commit f7455b1
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 0 deletions.
1 change: 1 addition & 0 deletions pgx-tests/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod shmem_tests;
mod spi_tests;
mod srf_tests;
mod struct_type_tests;
mod subxact_tests;
mod trigger_tests;
mod uuid_tests;
mod variadic_tests;
Expand Down
103 changes: 103 additions & 0 deletions pgx-tests/src/tests/subxact_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Portions Copyright 2019-2021 ZomboDB, LLC.
Portions Copyright 2021-2022 Technology Concepts & Design, Inc. <[email protected]>
All rights reserved.
Use of this source code is governed by the MIT license that can be found in the LICENSE file.
*/

#[cfg(any(test, feature = "pg_test"))]
#[pgx::pg_schema]
mod tests {
#[allow(unused_imports)]
use crate as pgx_tests;

use pgx::prelude::*;
use pgx::SpiClient;

#[pg_test]
fn test_subxact_smoketest() {
Spi::execute(|c| {
c.update("CREATE TABLE a (v INTEGER)", None, None);
let c = c.sub_transaction(|xact| {
xact.update("INSERT INTO a VALUES (0)", None, None);
assert_eq!(
0,
xact.select("SELECT v FROM a", Some(1), None)
.first()
.get_datum::<i32>(1)
.unwrap()
);
let xact = xact.sub_transaction(|xact| {
xact.update("INSERT INTO a VALUES (1)", None, None);
assert_eq!(
2,
xact.select("SELECT COUNT(*) FROM a", Some(1), None)
.first()
.get_datum::<i32>(1)
.unwrap()
);
xact.rollback()
});
xact.rollback()
});
assert_eq!(
0,
c.select("SELECT COUNT(*) FROM a", Some(1), None)
.first()
.get_datum::<i32>(1)
.unwrap()
);
})
}

#[pg_test]
fn test_commit_on_drop() {
Spi::execute(|c| {
c.update("CREATE TABLE a (v INTEGER)", None, None);
// The type below is explicit to ensure it's commit on drop by default
c.sub_transaction(|xact: SubTransaction<SpiClient, true>| {
xact.update("INSERT INTO a VALUES (0)", None, None);
// Dropped explicitly for illustration purposes
drop(xact);
});
// Create a new client to check the state
Spi::execute(|c| {
// The above insert should have been committed
assert_eq!(
1,
c.select("SELECT COUNT(*) FROM a", Some(1), None)
.first()
.get_datum::<i32>(1)
.unwrap()
);
});
})
}

#[pg_test]
fn test_rollback_on_drop() {
Spi::execute(|c| {
c.update("CREATE TABLE a (v INTEGER)", None, None);
// The type below is explicit to ensure it's commit on drop by default
c.sub_transaction(|xact: SubTransaction<SpiClient, true>| {
xact.update("INSERT INTO a VALUES (0)", None, None);
let xact = xact.rollback_on_drop();
// Dropped explicitly for illustration purposes
drop(xact);
});
// Create a new client to check the state
Spi::execute(|c| {
// The above insert should NOT have been committed
assert_eq!(
0,
c.select("SELECT COUNT(*) FROM a", Some(1), None)
.first()
.get_datum::<i32>(1)
.unwrap()
);
});
})
}
}
1 change: 1 addition & 0 deletions pgx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub mod shmem;
pub mod spi;
pub mod spinlock;
pub mod stringinfo;
pub mod subxact;
pub mod trigger_support;
pub mod tupdesc;
pub mod varlena;
Expand Down
3 changes: 3 additions & 0 deletions pgx/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ pub use crate::pg_sys::{
check_for_interrupts, debug1, debug2, debug3, debug4, debug5, ereport, error, function_name,
info, log, notice, warning, FATAL, PANIC,
};

// Sub-transactions
pub use crate::subxact::*;
196 changes: 196 additions & 0 deletions pgx/src/subxact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use crate::{pg_sys, PgMemoryContexts, SpiClient};
use std::fmt::{Debug, Formatter};
use std::ops::Deref;

/// Sub-transaction
///
/// Can be created by calling `SpiClient::sub_transaction`, `SubTransaction<Parent>::sub_transaction`
/// or any other implementation of `SubTransactionExt` and obtaining it as an argument to the provided closure.
///
/// Unless rolled back or committed explicitly, it'll commit if `COMMIT` generic parameter is `true`
/// (default) or roll back if it is `false`.
pub struct SubTransaction<Parent: SubTransactionExt, const COMMIT: bool = true> {
memory_context: pg_sys::MemoryContext,
resource_owner: pg_sys::ResourceOwner,
// Should the transaction be released, or was it already committed or rolled back?
//
// The reason we are not calling this `released` as we're also using this flag when
// we convert between commit_on_drop and rollback_on_drop to ensure it doesn't get released
// on the drop of the original value.
should_release: bool,
parent: Option<Parent>,
}

impl<Parent: SubTransactionExt, const COMMIT: bool> Debug for SubTransaction<Parent, COMMIT> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(std::any::type_name::<Self>())
}
}

impl<Parent: SubTransactionExt, const COMMIT: bool> SubTransaction<Parent, COMMIT> {
/// Create a new sub-transaction.
fn new(parent: Parent) -> Self {
// Remember the memory context before starting the sub-transaction
let ctx = PgMemoryContexts::CurrentMemoryContext.value();
// Remember resource owner before starting the sub-transaction
let resource_owner = unsafe { pg_sys::CurrentResourceOwner };
unsafe {
pg_sys::BeginInternalSubTransaction(std::ptr::null());
}
// Switch to the outer memory context so that all allocations remain
// there instead of the sub-transaction's context
PgMemoryContexts::For(ctx).set_as_current();
Self { memory_context: ctx, should_release: true, resource_owner, parent: Some(parent) }
}

/// Commit the transaction, returning its parent
pub fn commit(mut self) -> Parent {
self.internal_commit();
self.should_release = false;
self.parent.take().unwrap()
}

/// Rollback the transaction, returning its parent
pub fn rollback(mut self) -> Parent {
self.internal_rollback();
self.should_release = false;
self.parent.take().unwrap()
}

/// Returns the memory context this transaction is in
pub fn memory_context(&self) -> PgMemoryContexts {
PgMemoryContexts::For(self.memory_context)
}

fn internal_rollback(&self) {
unsafe {
pg_sys::RollbackAndReleaseCurrentSubTransaction();
pg_sys::CurrentResourceOwner = self.resource_owner;
}
PgMemoryContexts::For(self.memory_context).set_as_current();
}

fn internal_commit(&self) {
unsafe {
pg_sys::ReleaseCurrentSubTransaction();
pg_sys::CurrentResourceOwner = self.resource_owner;
}
PgMemoryContexts::For(self.memory_context).set_as_current();
}
}

impl<Parent: SubTransactionExt> SubTransaction<Parent, true> {
/// Make this sub-transaction roll back on drop
pub fn rollback_on_drop(self) -> SubTransaction<Parent, false> {
self.into()
}
}

impl<Parent: SubTransactionExt> SubTransaction<Parent, false> {
/// Make this sub-transaction commit on drop
pub fn commit_on_drop(self) -> SubTransaction<Parent, true> {
self.into()
}
}

impl<Parent: SubTransactionExt> Into<SubTransaction<Parent, false>>
for SubTransaction<Parent, true>
{
fn into(mut self) -> SubTransaction<Parent, false> {
let result = SubTransaction {
memory_context: self.memory_context,
resource_owner: self.resource_owner,
should_release: self.should_release,
parent: self.parent.take(),
};
// Make sure original sub-transaction won't commit
self.should_release = false;
result
}
}

impl<Parent: SubTransactionExt> Into<SubTransaction<Parent, true>>
for SubTransaction<Parent, false>
{
fn into(mut self) -> SubTransaction<Parent, true> {
let result = SubTransaction {
memory_context: self.memory_context,
resource_owner: self.resource_owner,
should_release: self.should_release,
parent: self.parent.take(),
};
// Make sure original sub-transaction won't roll back
self.should_release = false;
result
}
}

impl<Parent: SubTransactionExt, const COMMIT: bool> Drop for SubTransaction<Parent, COMMIT> {
fn drop(&mut self) {
if self.should_release {
if COMMIT {
self.internal_commit();
} else {
self.internal_rollback();
}
}
}
}

// This allows SubTransaction to be de-referenced to SpiClient
impl<'conn, const COMMIT: bool> Deref for SubTransaction<SpiClient<'conn>, COMMIT> {
type Target = SpiClient<'conn>;

fn deref(&self) -> &Self::Target {
self.parent.as_ref().unwrap()
}
}

// This allows a SubTransaction of a SubTransaction to be de-referenced to SpiClient
impl<Parent: SubTransactionExt, const COMMIT: bool> Deref
for SubTransaction<SubTransaction<Parent>, COMMIT>
{
type Target = Parent;

fn deref(&self) -> &Self::Target {
self.parent.as_ref().and_then(|p| p.parent.as_ref()).unwrap()
}
}

/// Trait that allows creating a sub-transaction off any type
pub trait SubTransactionExt {
/// Parent's type
///
/// In most common cases, it'll be equal to `Self`. However, in some cases
/// it may be desirable to use a different type to achieve certain goals.
type T: SubTransactionExt;

/// Consume `self` and execute a closure with a sub-transaction
///
/// If further use of the given sub-transaction is necessary, it must
/// be returned by the closure alongside with its intended result. Otherwise,
/// the sub-transaction be released when dropped.
fn sub_transaction<F: FnOnce(SubTransaction<Self::T>) -> R, R>(self, f: F) -> R
where
Self: Sized;
}

impl<'a> SubTransactionExt for SpiClient<'a> {
type T = Self;
fn sub_transaction<F: FnOnce(SubTransaction<Self::T>) -> R, R>(self, f: F) -> R
where
Self: Sized,
{
f(SubTransaction::new(self))
}
}

impl<Parent: SubTransactionExt> SubTransactionExt for SubTransaction<Parent> {
type T = Self;
fn sub_transaction<F: FnOnce(SubTransaction<Self::T>) -> R, R>(self, f: F) -> R
where
Self: Sized,
{
f(SubTransaction::new(self))
}
}

0 comments on commit f7455b1

Please sign in to comment.