Skip to content

Commit

Permalink
Distinguish overloaded Transaction::GetForUpdate
Browse files Browse the repository at this point in the history
We have added a GetForUpdate(… PinnableSlice* …) override which implements a missing method in the case where the column family is defaulted. A version with explicit column family already exists.

The overloads are not distinguishable in the case where nullptr is passed as the value, and this is manifest in transaction_test.cc. We cast appropriately in tests to fix this.

Does this constitute a breaking API change ?

Add a test for the PinnableSlice GetForUpdate() variant to confirm the basic usage of the new overload.
  • Loading branch information
alanpaxton committed Sep 18, 2023
1 parent 1a3e04b commit 38eda77
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 38 deletions.
7 changes: 7 additions & 0 deletions utilities/transactions/transaction_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ class TransactionBaseImpl : public Transaction {
exclusive, do_validate);
}

Status GetForUpdate(const ReadOptions& options, const Slice& key,
PinnableSlice* pinnable_val, bool exclusive,
const bool do_validate) override {
return GetForUpdate(options, db_->DefaultColumnFamily(), key, pinnable_val,
exclusive, do_validate);
}

using Transaction::MultiGet;
std::vector<Status> MultiGet(
const ReadOptions& _read_options,
Expand Down
130 changes: 92 additions & 38 deletions utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,42 @@ TEST_P(TransactionTest, SuccessTest) {
delete txn;
}

// Test the basic API of the pinnable slice overload of GetForUpdate()
TEST_P(TransactionTest, SuccessTestPinnable) {
ASSERT_OK(db->ResetStats());

WriteOptions write_options;
ReadOptions read_options;
PinnableSlice pinnable_val;

ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));

Transaction* txn = db->BeginTransaction(write_options, TransactionOptions());
ASSERT_TRUE(txn);

ASSERT_EQ(0, txn->GetNumPuts());
ASSERT_LE(0, txn->GetID());

ASSERT_OK(txn->GetForUpdate(read_options, "foo", &pinnable_val));
ASSERT_EQ(*pinnable_val.GetSelf(), std::string("bar"));

ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2")));

ASSERT_EQ(1, txn->GetNumPuts());

ASSERT_OK(txn->GetForUpdate(read_options, "foo", &pinnable_val));
ASSERT_EQ(*pinnable_val.GetSelf(), std::string("bar2"));

ASSERT_OK(txn->Commit());

ASSERT_OK(
db->Get(read_options, db->DefaultColumnFamily(), "foo", &pinnable_val));
ASSERT_EQ(*pinnable_val.GetSelf(), std::string("bar2"));

delete txn;
}

TEST_P(TransactionTest, SwitchMemtableDuringPrepareAndCommit_WC) {
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());

Expand Down Expand Up @@ -437,13 +473,16 @@ TEST_P(TransactionTest, SharedLocks) {
ASSERT_TRUE(txn3);

// Test shared access between txns
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn1->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

s = txn3->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn3->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

auto lock_data = db->GetLockStatusData();
Expand All @@ -466,60 +505,68 @@ TEST_P(TransactionTest, SharedLocks) {
ASSERT_OK(txn3->Rollback());

// Test txn1 and txn2 sharing a lock and txn3 trying to obtain it.
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn1->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

s = txn3->GetForUpdate(read_options, "foo", nullptr);
s = txn3->GetForUpdate(read_options, "foo", (std::string*)nullptr);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");

txn1->UndoGetForUpdate("foo");
s = txn3->GetForUpdate(read_options, "foo", nullptr);
s = txn3->GetForUpdate(read_options, "foo", (std::string*)nullptr);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");

txn2->UndoGetForUpdate("foo");
s = txn3->GetForUpdate(read_options, "foo", nullptr);
s = txn3->GetForUpdate(read_options, "foo", (std::string*)nullptr);
ASSERT_OK(s);

ASSERT_OK(txn1->Rollback());
ASSERT_OK(txn2->Rollback());
ASSERT_OK(txn3->Rollback());

// Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn1->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

s = txn2->GetForUpdate(read_options, "foo", nullptr);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");

txn1->UndoGetForUpdate("foo");
s = txn2->GetForUpdate(read_options, "foo", nullptr);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr);
ASSERT_OK(s);

ASSERT_OK(txn1->Rollback());
ASSERT_OK(txn2->Rollback());

// Test txn1 trying to downgrade its lock.
s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
s = txn1->GetForUpdate(read_options, "foo", (std::string*)nullptr,
true /* exclusive */);
ASSERT_OK(s);

s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");

// Should still fail after "downgrading".
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn1->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");

Expand All @@ -528,15 +575,17 @@ TEST_P(TransactionTest, SharedLocks) {

// Test txn1 holding an exclusive lock and txn2 trying to obtain shared
// access.
s = txn1->GetForUpdate(read_options, "foo", nullptr);
s = txn1->GetForUpdate(read_options, "foo", (std::string*)nullptr);
ASSERT_OK(s);

s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_TRUE(s.IsTimedOut());
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");

txn1->UndoGetForUpdate("foo");
s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
s = txn2->GetForUpdate(read_options, "foo", (std::string*)nullptr,
false /* exclusive */);
ASSERT_OK(s);

delete txn1;
Expand Down Expand Up @@ -570,8 +619,9 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
for (uint32_t i = 0; i < 31; i++) {
txns[i] = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txns[i]);
auto s = txns[i]->GetForUpdate(read_options, std::to_string((i + 1) / 2),
nullptr, false /* exclusive */);
auto s =
txns[i]->GetForUpdate(read_options, std::to_string((i + 1) / 2),
(std::string*)nullptr, false /* exclusive */);
ASSERT_OK(s);
}

Expand All @@ -585,8 +635,9 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
std::vector<port::Thread> threads;
for (uint32_t i = 0; i < 15; i++) {
std::function<void()> blocking_thread = [&, i] {
auto s = txns[i]->GetForUpdate(read_options, std::to_string(i + 1),
nullptr, true /* exclusive */);
auto s =
txns[i]->GetForUpdate(read_options, std::to_string(i + 1),
(std::string*)nullptr, true /* exclusive */);
ASSERT_OK(s);
ASSERT_OK(txns[i]->Rollback());
delete txns[i];
Expand All @@ -604,8 +655,8 @@ TEST_P(TransactionTest, DeadlockCycleShared) {

// Complete the cycle T[16 - 31] -> T1
for (uint32_t i = 15; i < 31; i++) {
auto s =
txns[i]->GetForUpdate(read_options, "0", nullptr, true /* exclusive */);
auto s = txns[i]->GetForUpdate(read_options, "0", (std::string*)nullptr,
true /* exclusive */);
ASSERT_TRUE(s.IsDeadlock());

// Calculate next buffer len, plateau at 5 when 5 records are inserted.
Expand Down Expand Up @@ -704,8 +755,8 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
for (uint32_t i = 0; i < 2; i++) {
txns_shared[i] = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txns_shared[i]);
auto s =
txns_shared[i]->GetForUpdate(read_options, std::to_string(i), nullptr);
auto s = txns_shared[i]->GetForUpdate(read_options, std::to_string(i),
(std::string*)nullptr);
ASSERT_OK(s);
}

Expand All @@ -719,7 +770,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
for (uint32_t i = 0; i < 1; i++) {
std::function<void()> blocking_thread = [&, i] {
auto s = txns_shared[i]->GetForUpdate(read_options, std::to_string(i + 1),
nullptr);
(std::string*)nullptr);
ASSERT_OK(s);
ASSERT_OK(txns_shared[i]->Rollback());
delete txns_shared[i];
Expand All @@ -736,7 +787,8 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();

// Complete the cycle T2 -> T1 with a shared lock.
auto s = txns_shared[1]->GetForUpdate(read_options, "0", nullptr, false);
auto s = txns_shared[1]->GetForUpdate(read_options, "0",
(std::string*)nullptr, false);
ASSERT_TRUE(s.IsDeadlock());

auto dlock_buffer = db->GetDeadlockInfoBuffer();
Expand Down Expand Up @@ -778,7 +830,8 @@ TEST_P(TransactionStressTest, DeadlockCycle) {
for (uint32_t i = 0; i < len; i++) {
txns[i] = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txns[i]);
auto s = txns[i]->GetForUpdate(read_options, std::to_string(i), nullptr);
auto s = txns[i]->GetForUpdate(read_options, std::to_string(i),
(std::string*)nullptr);
ASSERT_OK(s);
}

Expand All @@ -793,8 +846,8 @@ TEST_P(TransactionStressTest, DeadlockCycle) {
std::vector<port::Thread> threads;
for (uint32_t i = 0; i + 1 < len; i++) {
std::function<void()> blocking_thread = [&, i] {
auto s =
txns[i]->GetForUpdate(read_options, std::to_string(i + 1), nullptr);
auto s = txns[i]->GetForUpdate(read_options, std::to_string(i + 1),
(std::string*)nullptr);
ASSERT_OK(s);
ASSERT_OK(txns[i]->Rollback());
delete txns[i];
Expand All @@ -811,7 +864,8 @@ TEST_P(TransactionStressTest, DeadlockCycle) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();

// Complete the cycle Tlen -> T1
auto s = txns[len - 1]->GetForUpdate(read_options, "0", nullptr);
auto s =
txns[len - 1]->GetForUpdate(read_options, "0", (std::string*)nullptr);
ASSERT_TRUE(s.IsDeadlock());

const uint32_t dlock_buffer_size_ = (len - 1 > 5) ? 5 : (len - 1);
Expand Down Expand Up @@ -898,8 +952,8 @@ TEST_P(TransactionStressTest, DeadlockStress) {
// Lock keys in random order.
for (const auto& k : random_keys) {
// Lock mostly for shared access, but exclusive 1/4 of the time.
auto s =
txn->GetForUpdate(read_options, k, nullptr, txn->GetID() % 4 == 0);
auto s = txn->GetForUpdate(read_options, k, (std::string*)nullptr,
txn->GetID() % 4 == 0);
if (!s.ok()) {
ASSERT_TRUE(s.IsDeadlock());
ASSERT_OK(txn->Rollback());
Expand Down Expand Up @@ -3771,7 +3825,7 @@ TEST_P(TransactionTest, IteratorTest) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(results[i], iter->value().ToString());

s = txn->GetForUpdate(read_options, iter->key(), nullptr);
s = txn->GetForUpdate(read_options, iter->key(), (std::string*)nullptr);
if (i == 2) {
// "C" was modified after txn's snapshot
ASSERT_TRUE(s.IsBusy());
Expand Down Expand Up @@ -4694,7 +4748,7 @@ TEST_P(TransactionTest, TimeoutTest) {
txn_options0.lock_timeout = 50; // txn timeout no longer infinite
Transaction* txn1 = db->BeginTransaction(write_options, txn_options0);

s = txn1->GetForUpdate(read_options, "aaa", nullptr);
s = txn1->GetForUpdate(read_options, "aaa", (std::string*)nullptr);
ASSERT_OK(s);

// Conflicts with previous GetForUpdate.
Expand Down Expand Up @@ -4731,7 +4785,7 @@ TEST_P(TransactionTest, TimeoutTest) {
txn_options.expiration = 100; // 100ms
txn1 = db->BeginTransaction(write_options, txn_options);

s = txn1->GetForUpdate(read_options, "aaa", nullptr);
s = txn1->GetForUpdate(read_options, "aaa", (std::string*)nullptr);
ASSERT_OK(s);

// Conflicts with previous GetForUpdate.
Expand Down

0 comments on commit 38eda77

Please sign in to comment.