Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sequences backup with current value #3089

Merged
merged 10 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,11 @@ message TMoveIndex {
optional bool AllowOverwrite = 4;
}

message TSetVal {
shnikd marked this conversation as resolved.
Show resolved Hide resolved
optional sint64 NextValue = 1;
optional bool NextUsed = 2;
}

message TSequenceDescription {
optional string Name = 1; // mandatory
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
Expand All @@ -1305,6 +1310,7 @@ message TSequenceDescription {
optional uint64 Cache = 8; // number of items to cache, defaults to 1
optional sint64 Increment = 9; // increment at each call, defaults to 1
optional bool Cycle = 10; // true when cycle on overflow is allowed
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
}

message TSequenceSharding {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/protos/tx_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ message TEvMarkSchemeShardPipe {
uint64 Round = 3;
}

message TSetVal {
shnikd marked this conversation as resolved.
Show resolved Hide resolved
sint64 NextValue = 1;
bool NextUsed = 2;
}

message TEvCreateSequence {
NKikimrProto.TPathID PathId = 1;
uint64 TxId = 2;
Expand All @@ -40,6 +45,9 @@ message TEvCreateSequence {
bool Cycle = 9;
}
bool Frozen = 10; // defaults to false
oneof OptionalSetVal {
shnikd marked this conversation as resolved.
Show resolved Hide resolved
TSetVal SetVal = 11;
}
}

message TEvCreateSequenceResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));

if (descr.GetOmitIndexes()) {
continue;
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
const auto& name = child.first;
Expand All @@ -160,6 +156,10 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
continue;
}

if (descr.GetOmitIndexes()) {
continue;
}

if (!srcIndexPath.IsTableIndex()) {
continue;
}
Expand All @@ -185,9 +185,11 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(true);

auto* copySequence = scheme.MutableCopySequence();
copySequence->SetCopyFrom(srcPath.PathString() + "/" + sequenceDescription.GetName());
*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme));
}
}

Expand Down
27 changes: 24 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TConfigureParts : public TSubOperationState {
event->Record.SetFrozen(true);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCoptSequence TConfigureParts ProgressState"
"TCopySequence TConfigureParts ProgressState"
<< " sending TEvCreateSequence to tablet " << tabletId
<< " operationId# " << OperationId
<< " at tablet " << ssId);
Expand Down Expand Up @@ -274,6 +274,18 @@ class TProposedCopySequence : public TSubOperationState {
<< " operationId#" << OperationId;
}

void UpdateSequenceDescription(NKikimrSchemeOp::TSequenceDescription& descr) {
descr.SetStartValue(GetSequenceResult.GetStartValue());
descr.SetMinValue(GetSequenceResult.GetMinValue());
descr.SetMaxValue(GetSequenceResult.GetMaxValue());
descr.SetCache(GetSequenceResult.GetCache());
descr.SetIncrement(GetSequenceResult.GetIncrement());
descr.SetCycle(GetSequenceResult.GetCycle());
auto* setValMsg = descr.MutableSetVal();
shnikd marked this conversation as resolved.
Show resolved Hide resolved
setValMsg->SetNextValue(GetSequenceResult.GetNextValue());
setValMsg->SetNextUsed(GetSequenceResult.GetNextUsed());
}

public:
TProposedCopySequence(TOperationId id)
: OperationId(id)
Expand Down Expand Up @@ -333,7 +345,15 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

TPathId pathId = txState->TargetPathId;

NIceDb::TNiceDb db(context.GetDB());

auto sequenceInfo = context.SS->Sequences.at(pathId);
UpdateSequenceDescription(sequenceInfo->Description);

context.SS->PersistSequence(db, pathId, *sequenceInfo);

context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.OnComplete.ActivateTx(OperationId);
return true;
Expand Down Expand Up @@ -387,7 +407,7 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

auto getSequenceResult = ev->Get()->Record;
GetSequenceResult = ev->Get()->Record;

Y_ABORT_UNLESS(txState->Shards.size() == 1);
for (auto shard : txState->Shards) {
Expand All @@ -397,7 +417,8 @@ class TProposedCopySequence : public TSubOperationState {
Y_ABORT_UNLESS(currentTabletId != InvalidTabletId);

auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvRestoreSequence>(
txState->TargetPathId, getSequenceResult);
txState->TargetPathId, GetSequenceResult);

event->Record.SetTxId(ui64(OperationId.GetTxId()));
event->Record.SetTxPartId(OperationId.GetSubTxId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ class TConfigureParts : public TSubOperationState {
if (alterData->Description.HasCycle()) {
event->Record.SetCycle(alterData->Description.GetCycle());
}
if (alterData->Description.HasSetVal()) {
event->Record.MutableSetVal()->SetNextValue(alterData->Description.GetSetVal().GetNextValue());
event->Record.MutableSetVal()->SetNextUsed(alterData->Description.GetSetVal().GetNextUsed());
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCreateSequence TConfigureParts ProgressState"
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID));

const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss);
if (sourcePath.IsResolved()) {
task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId));
const TPath exportPathItem = exportPath.Child(ToString(itemIdx));
if (sourcePath.IsResolved() && exportPathItem.IsResolved()) {
auto exportDescription = GetTableDescription(ss, exportPathItem.Base()->PathId);
exportDescription.MutableTable()->SetName(sourcePath.LeafName());
task.MutableTable()->CopyFrom(exportDescription);
shnikd marked this conversation as resolved.
Show resolved Hide resolved
}

task.SetSnapshotStep(exportInfo->SnapshotStep);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
if (fromSequence.has_cycle()) {
seqDesc->SetCycle(fromSequence.cycle());
}
if (fromSequence.has_set_val()) {
auto* setVal = seqDesc->MutableSetVal();
setVal->SetNextUsed(fromSequence.set_val().next_used());
setVal->SetNextValue(fromSequence.set_val().next_value());
}

break;
}
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2362,16 +2362,17 @@ namespace NSchemeShardUT_Private {
runtime.Send(new IEventHandle(NSequenceProxy::MakeSequenceProxyServiceID(), sender, request.Release()));
}

i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender) {
i64 WaitNextValResult(
shnikd marked this conversation as resolved.
Show resolved Hide resolved
TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus) {
auto ev = runtime.GrabEdgeEventRethrow<NSequenceProxy::TEvSequenceProxy::TEvNextValResult>(sender);
auto* msg = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Status, Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
return msg->Value;
}

i64 DoNextVal(TTestActorRuntime& runtime, const TString& path) {
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus) {
auto sender = runtime.AllocateEdgeActor(0);
SendNextValRequest(runtime, sender, path);
return WaitNextValResult(runtime, sender);
return WaitNextValResult(runtime, sender, expectedStatus);
}
}
8 changes: 6 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,11 @@ namespace NSchemeShardUT_Private {
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);

void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender);
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path);
i64 WaitNextValResult(
TTestActorRuntime& runtime, const TActorId& sender,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
i64 DoNextVal(
TTestActorRuntime& runtime, const TString& path,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);

} //NSchemeShardUT_Private
146 changes: 146 additions & 0 deletions ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,152 @@ value {
UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ShouldRestoreSequence) {
TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TTestBasicRuntime runtime;
TTestEnv env(runtime);

ui64 txId = 100;

runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Original"
Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
}
SequenceDescription {
Name: "myseq"
}
)");
env.TestWaitNotification(runtime, txId);

i64 value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 1);

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Original"
destination_prefix: ""
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: ""
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");

const auto desc = DescribePath(runtime, "/MyRoot/Restored", true, true);
UNIT_ASSERT_VALUES_EQUAL(desc.GetStatus(), NKikimrScheme::StatusSuccess);

const auto& table = desc.GetPathDescription().GetTable();

value = DoNextVal(runtime, "/MyRoot/Restored/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 2);

UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ShouldRestoreSequenceWithOverflow) {
TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

TTestBasicRuntime runtime;
TTestEnv env(runtime);

ui64 txId = 100;

runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::SEQUENCEPROXY, NActors::NLog::PRI_TRACE);

TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
TableDescription {
Name: "Original"
Columns { Name: "key" Type: "Uint64" DefaultFromSequence: "myseq" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
}
SequenceDescription {
Name: "myseq"
MinValue: 1
MaxValue: 2
}
)");
env.TestWaitNotification(runtime, txId);

i64 value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 1);

value = DoNextVal(runtime, "/MyRoot/Original/myseq");
UNIT_ASSERT_VALUES_EQUAL(value, 2);

TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Original"
destination_prefix: ""
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetExport(runtime, txId, "/MyRoot");

TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ImportFromS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_prefix: ""
destination_path: "/MyRoot/Restored"
}
}
)", port));
env.TestWaitNotification(runtime, txId);
TestGetImport(runtime, txId, "/MyRoot");

const auto desc = DescribePath(runtime, "/MyRoot/Restored", true, true);
UNIT_ASSERT_VALUES_EQUAL(desc.GetStatus(), NKikimrScheme::StatusSuccess);

const auto& table = desc.GetPathDescription().GetTable();

value = DoNextVal(runtime, "/MyRoot/Restored/myseq", Ydb::StatusIds::SCHEME_ERROR);

UNIT_ASSERT_C(CheckDefaultFromSequence(table), "Invalid default value");
}

Y_UNIT_TEST(ExportImportPg) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableTablePgTypes(true));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_restore/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ PEERDIR(
ydb/core/wrappers/ut_helpers
ydb/core/ydb_convert
ydb/library/yql/sql/pg
ydb/library/yql/parser/pg_wrapper
ydb/library/yql/parser/pg_wrapper
)

SRCS(
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/tx/sequenceshard/tx_create_sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,15 @@ namespace NSequenceShard {
sequence.Cycle = msg->Record.GetCycle();
}

sequence.NextValue = sequence.StartValue;
sequence.NextUsed = false;

if (msg->Record.OptionalSetVal_case() == NKikimrTxSequenceShard::TEvCreateSequence::kSetVal) {
sequence.NextValue = msg->Record.GetSetVal().GetNextValue();
sequence.NextUsed = msg->Record.GetSetVal().GetNextUsed();
} else {
sequence.NextUsed = false;
sequence.NextValue = sequence.StartValue;
}

if (msg->Record.OptionalCache_case() == NKikimrTxSequenceShard::TEvCreateSequence::kCache) {
sequence.Cache = msg->Record.GetCache();
if (sequence.Cache < 1) {
Expand Down
Loading
Loading