Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sequences backup with current value #3089

Merged
merged 10 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,7 @@ message TSequenceDescription {
optional uint64 Cache = 8; // number of items to cache, defaults to 1
optional sint64 Increment = 9; // increment at each call, defaults to 1
optional bool Cycle = 10; // true when cycle on overflow is allowed
optional bool Overflowed = 11; // true when sequence is overflowed
}

message TSequenceSharding {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/tx_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ message TEvCreateSequence {
bool Cycle = 9;
}
bool Frozen = 10; // defaults to false
bool Overflowed = 11; // defaults to false
}

message TEvCreateSequenceResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));

if (descr.GetOmitIndexes()) {
continue;
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
const auto& name = child.first;
Expand All @@ -160,6 +156,10 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
continue;
}

if (descr.GetOmitIndexes()) {
continue;
}

if (!srcIndexPath.IsTableIndex()) {
continue;
}
Expand All @@ -185,9 +185,11 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(true);

auto* copySequence = scheme.MutableCopySequence();
copySequence->SetCopyFrom(srcPath.PathString() + "/" + sequenceDescription.GetName());
*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme));
}
}

Expand Down
59 changes: 56 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TConfigureParts : public TSubOperationState {
event->Record.SetFrozen(true);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCoptSequence TConfigureParts ProgressState"
"TCopySequence TConfigureParts ProgressState"
<< " sending TEvCreateSequence to tablet " << tabletId
<< " operationId# " << OperationId
<< " at tablet " << ssId);
Expand Down Expand Up @@ -274,6 +274,50 @@ class TProposedCopySequence : public TSubOperationState {
<< " operationId#" << OperationId;
}

void UpdateSequenceDescription(NKikimrSchemeOp::TSequenceDescription& descr) {
descr.SetStartValue(GetSequenceResult.GetNextValue());
shnikd marked this conversation as resolved.
Show resolved Hide resolved
descr.SetMinValue(GetSequenceResult.GetMinValue());
descr.SetMaxValue(GetSequenceResult.GetMaxValue());
descr.SetCache(GetSequenceResult.GetCache());
descr.SetIncrement(GetSequenceResult.GetIncrement());
descr.SetCycle(GetSequenceResult.GetCycle());

i64 nextValue = GetSequenceResult.GetNextValue();
i64 minValue = GetSequenceResult.GetMinValue();
i64 maxValue = GetSequenceResult.GetMaxValue();
bool cycle = GetSequenceResult.GetCycle();
bool overflowed = false;

if (GetSequenceResult.GetNextUsed()) {
i64 increment = GetSequenceResult.GetIncrement();
if (increment > 0) {
ui64 delta = increment;

if (nextValue < maxValue && ui64(maxValue) - ui64(nextValue) >= delta) {
nextValue += delta;
} else {
if (cycle) {
nextValue = minValue;
}
overflowed = true;
}
} else {
ui64 delta = -increment;

if (nextValue > minValue && ui64(nextValue) - ui64(minValue) >= delta) {
shnikd marked this conversation as resolved.
Show resolved Hide resolved
nextValue -= delta;
} else {
if (cycle) {
nextValue = maxValue;
}
overflowed = true;
}
}
}
descr.SetStartValue(nextValue);
descr.SetOverflowed(overflowed);
shnikd marked this conversation as resolved.
Show resolved Hide resolved
}

public:
TProposedCopySequence(TOperationId id)
: OperationId(id)
Expand Down Expand Up @@ -333,7 +377,15 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

TPathId pathId = txState->TargetPathId;

NIceDb::TNiceDb db(context.GetDB());

auto sequenceInfo = context.SS->Sequences.at(pathId);
UpdateSequenceDescription(sequenceInfo->Description);

context.SS->PersistSequence(db, pathId, *sequenceInfo);

context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.OnComplete.ActivateTx(OperationId);
return true;
Expand Down Expand Up @@ -387,7 +439,7 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

auto getSequenceResult = ev->Get()->Record;
GetSequenceResult = ev->Get()->Record;

Y_ABORT_UNLESS(txState->Shards.size() == 1);
for (auto shard : txState->Shards) {
Expand All @@ -397,7 +449,8 @@ class TProposedCopySequence : public TSubOperationState {
Y_ABORT_UNLESS(currentTabletId != InvalidTabletId);

auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvRestoreSequence>(
txState->TargetPathId, getSequenceResult);
txState->TargetPathId, GetSequenceResult);

event->Record.SetTxId(ui64(OperationId.GetTxId()));
event->Record.SetTxPartId(OperationId.GetSubTxId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class TConfigureParts : public TSubOperationState {
if (alterData->Description.HasCycle()) {
event->Record.SetCycle(alterData->Description.GetCycle());
}
event->Record.SetOverflowed(alterData->Description.GetOverflowed());

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCreateSequence TConfigureParts ProgressState"
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID));

const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss);
if (sourcePath.IsResolved()) {
task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId));
const TPath exportPathItem = exportPath.Child(ToString(itemIdx));
if (sourcePath.IsResolved() && exportPathItem.IsResolved()) {
auto exportDescription = GetTableDescription(ss, exportPathItem.Base()->PathId);
exportDescription.MutableTable()->SetName(sourcePath.LeafName());
task.MutableTable()->CopyFrom(exportDescription);
shnikd marked this conversation as resolved.
Show resolved Hide resolved
}

task.SetSnapshotStep(exportInfo->SnapshotStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
if (fromSequence.has_cycle()) {
seqDesc->SetCycle(fromSequence.cycle());
}
seqDesc->SetOverflowed(fromSequence.overflowed());

break;
}
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2362,16 +2362,17 @@ namespace NSchemeShardUT_Private {
runtime.Send(new IEventHandle(NSequenceProxy::MakeSequenceProxyServiceID(), sender, request.Release()));
}

i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender) {
i64 WaitNextValResult(
shnikd marked this conversation as resolved.
Show resolved Hide resolved
TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus) {
auto ev = runtime.GrabEdgeEventRethrow<NSequenceProxy::TEvSequenceProxy::TEvNextValResult>(sender);
auto* msg = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Status, Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
return msg->Value;
}

i64 DoNextVal(TTestActorRuntime& runtime, const TString& path) {
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus) {
auto sender = runtime.AllocateEdgeActor(0);
SendNextValRequest(runtime, sender, path);
return WaitNextValResult(runtime, sender);
return WaitNextValResult(runtime, sender, expectedStatus);
}
}
8 changes: 6 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,11 @@ namespace NSchemeShardUT_Private {
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);

void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender);
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path);
i64 WaitNextValResult(
TTestActorRuntime& runtime, const TActorId& sender,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
i64 DoNextVal(
TTestActorRuntime& runtime, const TString& path,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);

} //NSchemeShardUT_Private
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ PEERDIR(
ydb/core/tx
ydb/core/tx/datashard
ydb/core/tx/schemeshard
ydb/core/tx/sequenceproxy
shnikd marked this conversation as resolved.
Show resolved Hide resolved
ydb/core/tx/tx_allocator
ydb/core/tx/tx_proxy
ydb/public/lib/scheme_types
Expand Down
150 changes: 150 additions & 0 deletions ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,156 @@ value {
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ShouldRestoreSequence) {
TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TTestBasicRuntime runtime;
TTestEnv env(runtime);

ui64 txId = 100;

runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Original"
Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
}
SequenceDescription {
Name: "myseq"
}
)");
env.TestWaitNotification(runtime, txId);

i64 value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 1);

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Original"
destination_prefix: ""
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: ""
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");

const auto desc = DescribePath(runtime, "/MyRoot/Restored", true, true);
UNIT_ASSERT_VALUES_EQUAL(desc.GetStatus(), NKikimrScheme::StatusSuccess);

const auto& table = desc.GetPathDescription().GetTable();

value = DoNextVal(runtime, "/MyRoot/Restored/myseq", Ydb::StatusIds::SCHEME_ERROR);
UNIT_ASSERT_VALUES_EQUAL(value, 2);

UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ShouldRestoreSequenceWithOverflow) {
TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TTestBasicRuntime runtime;
TTestEnv env(runtime);

ui64 txId = 100;

runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Original"
Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
}
SequenceDescription {
Name: "myseq"
MinValue: 1
MaxValue: 2
}
)");
env.TestWaitNotification(runtime, txId);

i64 value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 1);

value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 2);

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Original"
destination_prefix: ""
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: ""
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");

const auto desc = DescribePath(runtime, "/MyRoot/Restored", true, true);
UNIT_ASSERT_VALUES_EQUAL(desc.GetStatus(), NKikimrScheme::StatusSuccess);

const auto& table = desc.GetPathDescription().GetTable();

value = DoNextVal(runtime, "/MyRoot/Restored/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 2);

value = DoNextVal(runtime, "/MyRoot/Restored/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 2);

UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ExportImportPg) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_restore/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ PEERDIR(
ydb/core/wrappers/ut_helpers
ydb/core/ydb_convert
ydb/library/yql/sql/pg
ydb/library/yql/parser/pg_wrapper
ydb/library/yql/parser/pg_wrapper
)

SRCS(
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/sequenceshard/public/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ namespace NSequenceShard {
return std::move(*this);
}

TBuilder&& SetOverflowed(bool overflowed) && {
Msg->Record.SetOverflowed(overflowed);
return std::move(*this);
}

THolder<TEvCreateSequence> Done() && {
return std::move(Msg);
}
Expand Down
Loading
Loading