Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sequences backup with current value #3089

Merged
merged 10 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,10 @@ message TMoveIndex {
}

message TSequenceDescription {
message TSetVal {
optional sint64 NextValue = 1;
optional bool NextUsed = 2;
}
optional string Name = 1; // mandatory
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
optional uint64 Version = 3; // incremented every time sequence is altered
Expand All @@ -1305,6 +1309,7 @@ message TSequenceDescription {
optional uint64 Cache = 8; // number of items to cache, defaults to 1
optional sint64 Increment = 9; // increment at each call, defaults to 1
optional bool Cycle = 10; // true when cycle on overflow is allowed
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
}

message TSequenceSharding {
Expand Down Expand Up @@ -1613,6 +1618,7 @@ message TDescribeOptions {
optional bool ShowPrivateTable = 7 [default = false];
optional bool ReturnChannelsBinding = 8 [default = false];
optional bool ReturnRangeKey = 9 [default = true];
optional bool ReturnSetVal = 10 [default = false];
}

// Request to read scheme for a specific path
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/protos/tx_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ message TEvMarkSchemeShardPipe {
}

message TEvCreateSequence {
message TSetVal {
sint64 NextValue = 1;
bool NextUsed = 2;
}

NKikimrProto.TPathID PathId = 1;
uint64 TxId = 2;
uint64 TxPartId = 3;
Expand All @@ -40,6 +45,7 @@ message TEvCreateSequence {
bool Cycle = 9;
}
bool Frozen = 10; // defaults to false
TSetVal SetVal = 11;
}

message TEvCreateSequenceResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));

if (descr.GetOmitIndexes()) {
continue;
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
const auto& name = child.first;
Expand All @@ -160,6 +156,10 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
continue;
}

if (descr.GetOmitIndexes()) {
continue;
}

if (!srcIndexPath.IsTableIndex()) {
continue;
}
Expand All @@ -185,9 +185,11 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(true);

auto* copySequence = scheme.MutableCopySequence();
copySequence->SetCopyFrom(srcPath.PathString() + "/" + sequenceDescription.GetName());
*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
result.push_back(CreateCopySequence(NextPartId(nextId, result), scheme));
}
}

Expand Down
27 changes: 24 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation_copy_sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TConfigureParts : public TSubOperationState {
event->Record.SetFrozen(true);

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCoptSequence TConfigureParts ProgressState"
"TCopySequence TConfigureParts ProgressState"
<< " sending TEvCreateSequence to tablet " << tabletId
<< " operationId# " << OperationId
<< " at tablet " << ssId);
Expand Down Expand Up @@ -274,6 +274,18 @@ class TProposedCopySequence : public TSubOperationState {
<< " operationId#" << OperationId;
}

void UpdateSequenceDescription(NKikimrSchemeOp::TSequenceDescription& descr) {
descr.SetStartValue(GetSequenceResult.GetStartValue());
descr.SetMinValue(GetSequenceResult.GetMinValue());
descr.SetMaxValue(GetSequenceResult.GetMaxValue());
descr.SetCache(GetSequenceResult.GetCache());
descr.SetIncrement(GetSequenceResult.GetIncrement());
descr.SetCycle(GetSequenceResult.GetCycle());
auto* setValMsg = descr.MutableSetVal();
shnikd marked this conversation as resolved.
Show resolved Hide resolved
setValMsg->SetNextValue(GetSequenceResult.GetNextValue());
setValMsg->SetNextUsed(GetSequenceResult.GetNextUsed());
}

public:
TProposedCopySequence(TOperationId id)
: OperationId(id)
Expand Down Expand Up @@ -333,7 +345,15 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

TPathId pathId = txState->TargetPathId;

NIceDb::TNiceDb db(context.GetDB());

auto sequenceInfo = context.SS->Sequences.at(pathId);
UpdateSequenceDescription(sequenceInfo->Description);

context.SS->PersistSequence(db, pathId, *sequenceInfo);

context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.OnComplete.ActivateTx(OperationId);
return true;
Expand Down Expand Up @@ -387,7 +407,7 @@ class TProposedCopySequence : public TSubOperationState {
return false;
}

auto getSequenceResult = ev->Get()->Record;
GetSequenceResult = ev->Get()->Record;

Y_ABORT_UNLESS(txState->Shards.size() == 1);
for (auto shard : txState->Shards) {
Expand All @@ -397,7 +417,8 @@ class TProposedCopySequence : public TSubOperationState {
Y_ABORT_UNLESS(currentTabletId != InvalidTabletId);

auto event = MakeHolder<NSequenceShard::TEvSequenceShard::TEvRestoreSequence>(
txState->TargetPathId, getSequenceResult);
txState->TargetPathId, GetSequenceResult);

event->Record.SetTxId(ui64(OperationId.GetTxId()));
event->Record.SetTxPartId(OperationId.GetSubTxId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ class TConfigureParts : public TSubOperationState {
if (alterData->Description.HasCycle()) {
event->Record.SetCycle(alterData->Description.GetCycle());
}
if (alterData->Description.HasSetVal()) {
event->Record.MutableSetVal()->SetNextValue(alterData->Description.GetSetVal().GetNextValue());
event->Record.MutableSetVal()->SetNextUsed(alterData->Description.GetSetVal().GetNextUsed());
}

LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCreateSequence TConfigureParts ProgressState"
Expand Down
34 changes: 32 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ static NKikimrSchemeOp::TPathDescription GetTableDescription(TSchemeShard* ss, c
return record.GetPathDescription();
}

void FillSetValForSequences(TSchemeShard* ss, NKikimrSchemeOp::TTableDescription& description,
const TPathId& exportItemPathId) {
NKikimrSchemeOp::TDescribeOptions opts;
opts.SetReturnSetVal(true);

auto pathDescription = DescribePath(ss, TlsActivationContext->AsActorContext(), exportItemPathId, opts);
auto tableDescription = pathDescription->GetRecord().GetPathDescription().GetTable();

THashMap<TString, NKikimrSchemeOp::TSequenceDescription::TSetVal> setValForSequences;

for (const auto& sequenceDescription : tableDescription.GetSequences()) {
if (sequenceDescription.HasSetVal()) {
setValForSequences[sequenceDescription.GetName()] = sequenceDescription.GetSetVal();
}
}

for (auto& sequenceDescription : *description.MutableSequences()) {
auto it = setValForSequences.find(sequenceDescription.GetName());
if (it != setValForSequences.end()) {
*sequenceDescription.MutableSetVal() = it->second;
}
}
}

THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
TSchemeShard* ss,
TTxId txId,
Expand All @@ -106,8 +130,14 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
task.SetNeedToBill(!exportInfo->UserSID || !ss->SystemBackupSIDs.contains(*exportInfo->UserSID));

const TPath sourcePath = TPath::Init(exportInfo->Items[itemIdx].SourcePathId, ss);
if (sourcePath.IsResolved()) {
task.MutableTable()->CopyFrom(GetTableDescription(ss, sourcePath.Base()->PathId));
const TPath exportItemPath = exportPath.Child(ToString(itemIdx));
if (sourcePath.IsResolved() && exportItemPath.IsResolved()) {
auto sourceDescription = GetTableDescription(ss, sourcePath.Base()->PathId);
if (sourceDescription.HasTable()) {
FillSetValForSequences(
ss, *sourceDescription.MutableTable(), exportItemPath.Base()->PathId);
}
task.MutableTable()->CopyFrom(sourceDescription);
}

task.SetSnapshotStep(exportInfo->SnapshotStep);
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,10 @@ class TSchemeShard
void DescribeTableIndex(const TPathId& pathId, const TString& name, TTableIndexInfo::TPtr indexInfo, NKikimrSchemeOp::TIndexDescription& entry);
void DescribeCdcStream(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TCdcStreamDescription& desc);
void DescribeCdcStream(const TPathId& pathId, const TString& name, TCdcStreamInfo::TPtr info, NKikimrSchemeOp::TCdcStreamDescription& desc);
void DescribeSequence(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TSequenceDescription& desc);
void DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info, NKikimrSchemeOp::TSequenceDescription& desc);
void DescribeSequence(const TPathId& pathId, const TString& name,
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal = false);
void DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info,
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal = false);
void DescribeReplication(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TReplicationDescription& desc);
void DescribeReplication(const TPathId& pathId, const TString& name, TReplicationInfo::TPtr info, NKikimrSchemeOp::TReplicationDescription& desc);
void DescribeBlobDepot(const TPathId& pathId, const TString& name, NKikimrSchemeOp::TBlobDepotDescription& desc);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
if (fromSequence.has_cycle()) {
seqDesc->SetCycle(fromSequence.cycle());
}
if (fromSequence.has_set_val()) {
auto* setVal = seqDesc->MutableSetVal();
setVal->SetNextUsed(fromSequence.set_val().next_used());
setVal->SetNextValue(fromSequence.set_val().next_value());
}

break;
}
Expand Down
13 changes: 9 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
bool returnBackupInfo = Params.GetBackupInfo();
bool returnBoundaries = false;
bool returnRangeKey = true;
bool returnSetVal = Params.GetOptions().GetReturnSetVal();
if (Params.HasOptions()) {
returnConfig = Params.GetOptions().GetReturnPartitionConfig();
returnPartitioning = Params.GetOptions().GetReturnPartitioningInfo();
Expand Down Expand Up @@ -361,7 +362,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
Self->DescribeCdcStream(childPathId, childName, *entry->AddCdcStreams());
break;
case NKikimrSchemeOp::EPathTypeSequence:
Self->DescribeSequence(childPathId, childName, *entry->AddSequences());
Self->DescribeSequence(childPathId, childName, *entry->AddSequences(), returnSetVal);
break;
default:
Y_FAIL_S("Unexpected table's child"
Expand Down Expand Up @@ -1241,24 +1242,28 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
}

void TSchemeShard::DescribeSequence(const TPathId& pathId, const TString& name,
NKikimrSchemeOp::TSequenceDescription& desc)
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal)
{
auto it = Sequences.find(pathId);
Y_VERIFY_S(it != Sequences.end(), "Sequence not found"
<< " pathId# " << pathId
<< " name# " << name);
DescribeSequence(pathId, name, it->second, desc);
DescribeSequence(pathId, name, it->second, desc, fillSetVal);
}

void TSchemeShard::DescribeSequence(const TPathId& pathId, const TString& name, TSequenceInfo::TPtr info,
NKikimrSchemeOp::TSequenceDescription& desc)
NKikimrSchemeOp::TSequenceDescription& desc, bool fillSetVal)
{
Y_VERIFY_S(info, "Empty sequence info"
<< " pathId# " << pathId
<< " name# " << name);

desc = info->Description;

if (!fillSetVal) {
desc.ClearSetVal();
}

desc.SetName(name);
PathIdFromPathId(pathId, desc.MutablePathId());
desc.SetVersion(info->AlterVersion);
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2362,16 +2362,17 @@ namespace NSchemeShardUT_Private {
runtime.Send(new IEventHandle(NSequenceProxy::MakeSequenceProxyServiceID(), sender, request.Release()));
}

i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender) {
i64 WaitNextValResult(
shnikd marked this conversation as resolved.
Show resolved Hide resolved
TTestActorRuntime& runtime, const TActorId& sender, Ydb::StatusIds::StatusCode expectedStatus) {
auto ev = runtime.GrabEdgeEventRethrow<NSequenceProxy::TEvSequenceProxy::TEvNextValResult>(sender);
auto* msg = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(msg->Status, Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(msg->Status, expectedStatus);
return msg->Value;
}

i64 DoNextVal(TTestActorRuntime& runtime, const TString& path) {
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path, Ydb::StatusIds::StatusCode expectedStatus) {
auto sender = runtime.AllocateEdgeActor(0);
SendNextValRequest(runtime, sender, path);
return WaitNextValResult(runtime, sender);
return WaitNextValResult(runtime, sender, expectedStatus);
}
}
8 changes: 6 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,11 @@ namespace NSchemeShardUT_Private {
void WriteRow(TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected = true);

void SendNextValRequest(TTestActorRuntime& runtime, const TActorId& sender, const TString& path);
i64 WaitNextValResult(TTestActorRuntime& runtime, const TActorId& sender);
i64 DoNextVal(TTestActorRuntime& runtime, const TString& path);
i64 WaitNextValResult(
TTestActorRuntime& runtime, const TActorId& sender,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);
i64 DoNextVal(
TTestActorRuntime& runtime, const TString& path,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);

} //NSchemeShardUT_Private
Loading
Loading