Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sub-transactions API #912

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pgx-tests/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod shmem_tests;
mod spi_tests;
mod srf_tests;
mod struct_type_tests;
mod subxact_tests;
mod trigger_tests;
mod uuid_tests;
mod variadic_tests;
Expand Down
114 changes: 114 additions & 0 deletions pgx-tests/src/tests/subxact_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Portions Copyright 2019-2021 ZomboDB, LLC.
Portions Copyright 2021-2022 Technology Concepts & Design, Inc. <[email protected]>

All rights reserved.

Use of this source code is governed by the MIT license that can be found in the LICENSE file.
*/

#[cfg(any(test, feature = "pg_test"))]
#[pgx::pg_schema]
mod tests {
#[allow(unused_imports)]
use crate as pgx_tests;

use pgx::prelude::*;
use pgx::spi::SpiClient;
use pgx::subxact::CommitOnDrop;

#[pg_test]
fn test_subxact_smoketest() {
Spi::connect(|mut c| {
c.update("CREATE TABLE a (v INTEGER)", None, None).unwrap();
let c = c.sub_transaction(|mut xact| {
xact.update("INSERT INTO a VALUES (0)", None, None).unwrap();
assert_eq!(
0,
xact.select("SELECT v FROM a", Some(1), None)
.unwrap()
.first()
.get::<i64>(1)
.unwrap()
.unwrap()
);
let xact = xact.sub_transaction(|mut xact| {
xact.update("INSERT INTO a VALUES (1)", None, None).unwrap();
assert_eq!(
2,
xact.select("SELECT COUNT(*) FROM a", Some(1), None)
.unwrap()
.first()
.get::<i64>(1)
.unwrap()
.unwrap()
);
xact.rollback()
});
xact.rollback()
});
assert_eq!(
0,
c.select("SELECT COUNT(*) FROM a", Some(1), None)
.unwrap()
.first()
.get::<i64>(1)
.unwrap()
.unwrap()
);
})
}

#[pg_test]
fn test_commit_on_drop() {
Spi::connect(|mut c| {
c.update("CREATE TABLE a (v INTEGER)", None, None).unwrap();
// The type below is explicit to ensure it's commit on drop by default
c.sub_transaction(|mut xact: SubTransaction<SpiClient, CommitOnDrop>| {
xact.update("INSERT INTO a VALUES (0)", None, None).unwrap();
// Dropped explicitly for illustration purposes
drop(xact);
});
// Create a new client to check the state
Spi::connect(|c| {
// The above insert should have been committed
assert_eq!(
1,
c.select("SELECT COUNT(*) FROM a", Some(1), None)
.unwrap()
.first()
.get::<i64>(1)
.unwrap()
.unwrap()
);
});
})
}

#[pg_test]
fn test_rollback_on_drop() {
Spi::connect(|mut c| {
c.update("CREATE TABLE a (v INTEGER)", None, None).unwrap();
// The type below is explicit to ensure it's commit on drop by default
c.sub_transaction(|mut xact: SubTransaction<SpiClient, CommitOnDrop>| {
xact.update("INSERT INTO a VALUES (0)", None, None).unwrap();
let xact = xact.rollback_on_drop();
// Dropped explicitly for illustration purposes
drop(xact);
});
// Create a new client to check the state
Spi::connect(|c| {
// The above insert should NOT have been committed
assert_eq!(
0,
c.select("SELECT COUNT(*) FROM a", Some(1), None)
.unwrap()
.first()
.get::<i64>(1)
.unwrap()
.unwrap()
);
});
})
}
}
1 change: 1 addition & 0 deletions pgx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub mod spi;
pub mod spinlock;
pub mod srf;
pub mod stringinfo;
pub mod subxact;
pub mod trigger_support;
pub mod tupdesc;
pub mod varlena;
Expand Down
3 changes: 3 additions & 0 deletions pgx/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ pub use crate::pg_sys::{
check_for_interrupts, debug1, debug2, debug3, debug4, debug5, ereport, error, function_name,
info, log, notice, warning, FATAL, PANIC,
};

// Sub-transactions
pub use crate::subxact::{SubTransaction, SubTransactionExt};
254 changes: 254 additions & 0 deletions pgx/src/subxact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use crate::{pg_sys, spi::SpiClient, PgMemoryContexts};
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};

/// Releases a sub-transaction on Drop
pub trait ReleaseOnDrop {}

/// Sub-transaction's contextual information
#[derive(Clone, Copy)]
pub struct Context {
memory_context: pg_sys::MemoryContext,
// Resource ownership before the transaction
//
// Based on information from src/backend/utils/resowner/README
// as well as practical use of it in src/pl/plpython/plpy_spi.c
resource_owner: pg_sys::ResourceOwner,
}

impl Context {
/// Captures the context
fn capture() -> Self {
// Remember the memory context before starting the sub-transaction
let memory_context = PgMemoryContexts::CurrentMemoryContext.value();
// Remember resource owner before starting the sub-transaction
let resource_owner = unsafe { pg_sys::CurrentResourceOwner };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we get a note mentioning when this variable is set (i.e. "what is the last write we are expecting to be reading")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please elaborate on this request? I am not 100% sure I understood what you're asking me to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Jubilee means when might Postgres initially set (or later change) this and how are we certain it even has a valid value at this point in time?

Self { memory_context, resource_owner }
}
}

impl From<Context> for CommitOnDrop {
fn from(context: Context) -> Self {
CommitOnDrop(context)
}
}

impl From<Context> for RollbackOnDrop {
fn from(context: Context) -> Self {
RollbackOnDrop(context)
}
}

/// Commits a sub-transaction on Drop
pub struct CommitOnDrop(Context);

impl Drop for CommitOnDrop {
fn drop(&mut self) {
unsafe {
pg_sys::ReleaseCurrentSubTransaction();
pg_sys::CurrentResourceOwner = self.0.resource_owner;
}
PgMemoryContexts::For(self.0.memory_context).set_as_current();
}
}

impl ReleaseOnDrop for CommitOnDrop {}

/// Rolls back a sub-transaction on Drop
pub struct RollbackOnDrop(Context);

impl Drop for RollbackOnDrop {
fn drop(&mut self) {
unsafe {
pg_sys::RollbackAndReleaseCurrentSubTransaction();
pg_sys::CurrentResourceOwner = self.0.resource_owner;
}
PgMemoryContexts::For(self.0.memory_context).set_as_current();
}
}

impl ReleaseOnDrop for RollbackOnDrop {}

impl Into<RollbackOnDrop> for CommitOnDrop {
fn into(self) -> RollbackOnDrop {
let result = RollbackOnDrop(self.0);
// IMPORTANT: avoid running Drop (that would commit)
std::mem::forget(self);
result
}
}

impl Into<CommitOnDrop> for RollbackOnDrop {
fn into(self) -> CommitOnDrop {
let result = CommitOnDrop(self.0);
// IMPORTANT: avoid running Drop (that would roll back)
std::mem::forget(self);
result
}
}

struct NoOpOnDrop;

impl ReleaseOnDrop for NoOpOnDrop {}

/// Sub-transaction
///
/// Can be created by calling `SpiClient::sub_transaction`, `SubTransaction<Parent>::sub_transaction`
/// or any other implementation of `SubTransactionExt` and obtaining it as an argument to the provided closure.
///
/// Unless rolled back or committed explicitly, it'll commit if `Release` generic parameter is `CommitOnDrop`
/// (default) or roll back if it is `RollbackOnDrop`.
#[derive(Debug)]
pub struct SubTransaction<Parent: SubTransactionExt, Release: ReleaseOnDrop = CommitOnDrop> {
// Transaction release mechanism (commit, drop)
release: Release,
// Transaction parent
parent: Parent,
}

impl<Parent: SubTransactionExt, Release: ReleaseOnDrop> SubTransaction<Parent, Release>
where
Release: From<Context>,
{
/// Create a new sub-transaction.
fn new(parent: Parent) -> Self {
let context = Context::capture();
let memory_context = context.memory_context;
let release = context.into();
unsafe {
pg_sys::BeginInternalSubTransaction(std::ptr::null() /* [no] transaction name */);
}
// Switch to the outer memory context so that all allocations remain
// there instead of the sub-transaction's context
PgMemoryContexts::For(memory_context).set_as_current();
Self { release, parent }
}
}

impl<Parent: SubTransactionExt> SubTransaction<Parent, CommitOnDrop> {
/// Commit the transaction, returning its parent
pub fn commit(self) -> Parent {
// `Self::do_nothing_on_drop()` will commit as `Release` is `CommitOnDrop`
self.do_nothing_on_drop().parent
}
}

impl<Parent: SubTransactionExt> SubTransaction<Parent, RollbackOnDrop> {
/// Commit the transaction, returning its parent
pub fn commit(self) -> Parent {
// Make sub-transaction commit on drop and then use `commit`
self.commit_on_drop().commit()
}
}

impl<Parent: SubTransactionExt> SubTransaction<Parent, RollbackOnDrop> {
/// Rollback the transaction, returning its parent
pub fn rollback(self) -> Parent {
// `Self::do_nothing_on_drop()` will roll back as `Release` is `RollbackOnDrop`
self.do_nothing_on_drop().parent
}
}

impl<Parent: SubTransactionExt> SubTransaction<Parent, CommitOnDrop> {
/// Rollback the transaction, returning its parent
pub fn rollback(self) -> Parent {
// Make sub-transaction roll back on drop and then use `rollback`
self.rollback_on_drop().rollback()
}
}

impl<Parent: SubTransactionExt> SubTransaction<Parent, CommitOnDrop> {
/// Make this sub-transaction roll back on drop
pub fn rollback_on_drop(self) -> SubTransaction<Parent, RollbackOnDrop> {
SubTransaction { parent: self.parent, release: self.release.into() }
}
}

impl<Parent: SubTransactionExt> SubTransaction<Parent, RollbackOnDrop> {
/// Make this sub-transaction commit on drop
pub fn commit_on_drop(self) -> SubTransaction<Parent, CommitOnDrop> {
SubTransaction { parent: self.parent, release: self.release.into() }
}
}

impl<Parent: SubTransactionExt, Release: ReleaseOnDrop> SubTransaction<Parent, Release> {
/// Make this sub-transaction do nothing on drop
///
/// Releases the sub-transaction based on `Release` generic parameter. Further
/// dropping of the sub-transaction will not do anything.
fn do_nothing_on_drop(self) -> SubTransaction<Parent, NoOpOnDrop> {
SubTransaction { parent: self.parent, release: NoOpOnDrop }
}
}

// This allows SubTransaction to be de-referenced to SpiClient
impl<'conn, Release: ReleaseOnDrop> Deref for SubTransaction<SpiClient<'conn>, Release> {
type Target = SpiClient<'conn>;

fn deref(&self) -> &Self::Target {
&self.parent
}
}

impl<'conn, Release: ReleaseOnDrop> DerefMut for SubTransaction<SpiClient<'conn>, Release> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.parent
}
}

// This allows a SubTransaction of a SubTransaction to be de-referenced to SpiClient
impl<Parent: SubTransactionExt, Release: ReleaseOnDrop> Deref
for SubTransaction<SubTransaction<Parent>, Release>
{
type Target = Parent;

fn deref(&self) -> &Self::Target {
&self.parent.parent
}
}

impl<Parent: SubTransactionExt, Release: ReleaseOnDrop> DerefMut
for SubTransaction<SubTransaction<Parent>, Release>
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.parent.parent
}
}

/// Trait that allows creating a sub-transaction off any type
pub trait SubTransactionExt {
/// Parent's type
///
/// In most common cases, it'll be equal to `Self`. However, in some cases
/// it may be desirable to use a different type to achieve certain goals.
type Parent: SubTransactionExt;

/// Consume `self` and execute a closure with a sub-transaction
///
/// If further use of the given sub-transaction is necessary, it must
/// be returned by the closure alongside with its intended result. Otherwise,
/// the sub-transaction be released when dropped.
fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R
where
Self: Sized;
}

impl<'a> SubTransactionExt for SpiClient<'a> {
type Parent = Self;
fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R
where
Self: Sized,
{
f(SubTransaction::new(self))
}
}

impl<Parent: SubTransactionExt> SubTransactionExt for SubTransaction<Parent> {
type Parent = Self;
fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R
where
Self: Sized,
{
f(SubTransaction::new(self))
}
}