Skip to content

Commit

Permalink
Setup sys locks in TTxApplyReplicationChanges
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Sep 24, 2024
1 parent ec8172a commit 304afae
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
6 changes: 6 additions & 0 deletions ydb/core/tx/datashard/datashard_repl_apply.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "datashard_impl.h"
#include "datashard_locks_db.h"
#include "setup_sys_locks.h"

#include <util/string/escape.h>

Expand All @@ -24,6 +26,9 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShar
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
Y_UNUSED(ctx);

TDataShardLocksDb locksDb(*Self, txc);
TSetupSysLocks guardLocks(*Self, &locksDb);

if (Self->State != TShardState::Ready) {
Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
Expand Down Expand Up @@ -92,6 +97,7 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShar
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_OK);
}

Self->SysLocksTable().ApplyLocks();
return true;
}

Expand Down
38 changes: 38 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_replication.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include "datashard_active_transaction.h"
#include "datashard_ut_common_kqp.h"

#include <ydb/core/tx/tx_proxy/proxy.h>

namespace NKikimr {

using namespace NKikimr::NDataShard;
using namespace NKikimr::NDataShard::NKqpHelpers;
using namespace NSchemeShard;
using namespace Tests;

Expand Down Expand Up @@ -307,6 +309,42 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
}, NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED);
}

Y_UNIT_TEST(ApplyChangesWithConcurrentTx) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);
CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
.Replicated(true)
.ReplicationConsistency(EReplicationConsistency::Weak)
);

auto shards = GetTableShards(server, sender, "/Root/table-1");
auto tableId = ResolveTableId(server, sender, "/Root/table-1");

ApplyChanges(server, shards.at(0), tableId, "my-source", {
TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 },
});

TString sessionId;
TString txId;
KqpSimpleBegin(runtime, sessionId, txId, "SELECT * FROM `/Root/table-1`;");

ApplyChanges(server, shards.at(0), tableId, "my-source", {
TChange{ .Offset = 1, .WriteTxId = 0, .Key = 1, .Value = 21 },
});

KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;");
}

}

} // namespace NKikimr

0 comments on commit 304afae

Please sign in to comment.