Skip to content

Commit

Permalink
Copy table with sequences and backup without NextVal (ydb-platform#2285)
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd committed May 30, 2024
1 parent 2c3d924 commit 2b4191b
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 21 deletions.
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ TMaybe<Ydb::Table::CreateTableRequest> GenYdbScheme(
FillPartitioningSettings(scheme, tableDesc);
FillKeyBloomFilter(scheme, tableDesc);
FillReadReplicasSettings(scheme, tableDesc);
FillSequenceDescription(scheme, tableDesc);

return scheme;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ class TBackupRestoreOperationBase: public TSubOperation {
.NotAsyncReplicaTable()
.NotUnderOperation()
.IsCommonSensePath() //forbid alter impl index tables
.NotChildren(); //forbid backup table with indexes
.CanBackupTable(); //forbid backup table with indexes

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,33 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
TPath dstPath = TPath::Resolve(dstStr, context.SS);
TPath dstParentPath = dstPath.Parent();

THashSet<TString> sequences;
for (const auto& child: srcPath.Base()->GetChildren()) {
auto name = child.first;
auto pathId = child.second;

TPath childPath = srcPath.Child(name);
if (!childPath.IsSequence() || childPath.IsDeleted()) {
continue;
}

Y_ABORT_UNLESS(childPath.Base()->PathId == pathId);

TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId);
const auto& sequenceDesc = sequenceInfo->Description;
const auto& sequenceName = sequenceDesc.GetName();

sequences.emplace(sequenceName);
}

result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup())));
CopyTableTask(srcPath, dstPath, descr.GetOmitFollowers(), descr.GetIsBackup()), sequences));

if (descr.GetOmitIndexes()) {
continue;
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (const auto& child: srcPath.Base()->GetChildren()) {
const auto& name = child.first;
const auto& pathId = child.second;
Expand All @@ -133,6 +153,13 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
continue;
}

if (srcIndexPath.IsSequence()) {
TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId);
const auto& sequenceDesc = sequenceInfo->Description;
sequenceDescriptions.push_back(sequenceDesc);
continue;
}

if (!srcIndexPath.IsTableIndex()) {
continue;
}
Expand All @@ -151,6 +178,17 @@ TVector<ISubOperation::TPtr> CreateConsistentCopyTables(TOperationId nextId, con
result.push_back(CreateCopyTable(NextPartId(nextId, result),
CopyTableTask(srcImplTable, dstImplTable, descr.GetOmitFollowers(), descr.GetIsBackup())));
}

for (auto&& sequenceDescription : sequenceDescriptions) {
auto scheme = TransactionTemplate(
dstPath.PathString(),
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(true);

*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
}
}

return result;
Expand Down
62 changes: 57 additions & 5 deletions ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ class TPropose: public TSubOperationState {
};

class TCopyTable: public TSubOperation {

THashSet<TString> LocalSequences;

static TTxState::ETxState NextState() {
return TTxState::CreateParts;
}
Expand Down Expand Up @@ -267,6 +270,12 @@ class TCopyTable: public TSubOperation {
public:
using TSubOperation::TSubOperation;

explicit TCopyTable(const TOperationId& id, const TTxTransaction& tx, const THashSet<TString>& localSequences)
: TSubOperation(id, tx)
, LocalSequences(localSequences)
{
}

bool IsShadowDataAllowed() const {
return AppData()->AllowShadowDataInSchemeShardForTests;
}
Expand Down Expand Up @@ -459,7 +468,8 @@ class TCopyTable: public TSubOperation {

const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
const TSchemeLimits& limits = domainInfo->GetSchemeLimits();
TTableInfo::TAlterDataPtr alterData = TTableInfo::CreateAlterData(nullptr, schema, *typeRegistry, limits, *domainInfo, context.SS->EnableTablePgTypes, errStr);
TTableInfo::TAlterDataPtr alterData = TTableInfo::CreateAlterData(nullptr, schema, *typeRegistry,
limits, *domainInfo, context.SS->EnableTablePgTypes, errStr, LocalSequences);
if (!alterData.Get()) {
result->SetError(NKikimrScheme::StatusSchemeError, errStr);
return result;
Expand Down Expand Up @@ -626,8 +636,9 @@ class TCopyTable: public TSubOperation {

namespace NKikimr::NSchemeShard {

ISubOperation::TPtr CreateCopyTable(TOperationId id, const TTxTransaction& tx) {
return MakeSubOperation<TCopyTable>(id, tx);
ISubOperation::TPtr CreateCopyTable(TOperationId id, const TTxTransaction& tx, const THashSet<TString>& localSequences)
{
return MakeSubOperation<TCopyTable>(id, tx, localSequences);
}

ISubOperation::TPtr CreateCopyTable(TOperationId id, TTxState::ETxState state) {
Expand Down Expand Up @@ -659,6 +670,25 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
}
}

THashSet<TString> sequences;
for (auto& child: srcPath.Base()->GetChildren()) {
auto name = child.first;
auto pathId = child.second;

TPath childPath = srcPath.Child(name);
if (!childPath.IsSequence() || childPath.IsDeleted()) {
continue;
}

Y_ABORT_UNLESS(childPath.Base()->PathId == pathId);

TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId);
const auto& sequenceDesc = sequenceInfo->Description;
const auto& sequenceName = sequenceDesc.GetName();

sequences.emplace(sequenceName);
}

TPath workDir = TPath::Resolve(tx.GetWorkingDir(), context.SS);
TPath dstPath = workDir.Child(copying.GetName());

Expand All @@ -674,15 +704,27 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
operation->SetIsBackup(copying.GetIsBackup());
operation->MutablePartitionConfig()->CopyFrom(copying.GetPartitionConfig());

result.push_back(CreateCopyTable(NextPartId(nextId, result), schema));
result.push_back(CreateCopyTable(NextPartId(nextId, result), schema, sequences));
}

TVector<NKikimrSchemeOp::TSequenceDescription> sequenceDescriptions;
for (auto& child: srcPath.Base()->GetChildren()) {
auto name = child.first;
auto pathId = child.second;

TPath childPath = srcPath.Child(name);
if (!childPath.IsTableIndex() || childPath.IsDeleted()) {
if (childPath.IsDeleted()) {
continue;
}

if (childPath.IsSequence()) {
TSequenceInfo::TPtr sequenceInfo = context.SS->Sequences.at(pathId);
const auto& sequenceDesc = sequenceInfo->Description;
sequenceDescriptions.push_back(sequenceDesc);
continue;
}

if (!childPath.IsTableIndex()) {
continue;
}

Expand Down Expand Up @@ -727,6 +769,16 @@ TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTrans
}
}

for (auto&& sequenceDescription : sequenceDescriptions) {
auto scheme = TransactionTemplate(
tx.GetWorkingDir() + "/" + copying.GetName(),
NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence);
scheme.SetFailOnExist(tx.GetFailOnExist());

*scheme.MutableSequence() = std::move(sequenceDescription);

result.push_back(CreateNewSequence(NextPartId(nextId, result), scheme));
}
return result;
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard__operation_part.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ ISubOperation::TPtr CreateForceDropUnsafe(TOperationId id, TTxState::ETxState st
ISubOperation::TPtr CreateNewTable(TOperationId id, const TTxTransaction& tx, const THashSet<TString>& localSequences = { });
ISubOperation::TPtr CreateNewTable(TOperationId id, TTxState::ETxState state);

ISubOperation::TPtr CreateCopyTable(TOperationId id, const TTxTransaction& tx);
ISubOperation::TPtr CreateCopyTable(TOperationId id, const TTxTransaction& tx,
const THashSet<TString>& localSequences = { });
ISubOperation::TPtr CreateCopyTable(TOperationId id, TTxState::ETxState state);
TVector<ISubOperation::TPtr> CreateCopyTable(TOperationId nextId, const TTxTransaction& tx, TOperationContext& context);

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_import__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}

if (item.State == EState::CreateTable) {
item.DstPathId = Self->MakeLocalId(TLocalPathId(record.GetPathId()));
auto createPath = TPath::Resolve(item.DstPathName, Self);
Y_ABORT_UNLESS(createPath);

item.DstPathId = createPath.Base()->PathId;
Self->PersistImportItemDstPathId(db, importInfo, itemIdx);
}

Expand Down
42 changes: 39 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
}

auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
modifyScheme.SetInternal(true);

const TPath domainPath = TPath::Init(importInfo->DomainPathId, ss);
Expand All @@ -37,15 +37,51 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(

modifyScheme.SetWorkingDir(wdAndPath.first);

auto& tableDesc = *modifyScheme.MutableCreateTable();
auto* indexedTable = modifyScheme.MutableCreateIndexedTable();
auto& tableDesc = *(indexedTable->MutableTableDescription());
tableDesc.SetName(wdAndPath.second);

Y_ABORT_UNLESS(ss->TableProfilesLoaded);
Ydb::StatusIds::StatusCode status;
if (!FillTableDescription(modifyScheme, item.Scheme, ss->TableProfiles, status, error)) {
if (!FillTableDescription(modifyScheme, item.Scheme, ss->TableProfiles, status, error, true)) {
return nullptr;
}

for(const auto& column: item.Scheme.columns()) {
switch (column.default_value_case()) {
case Ydb::Table::ColumnMeta::kFromSequence: {
const auto& fromSequence = column.from_sequence();

auto seqDesc = indexedTable->MutableSequenceDescription()->Add();
seqDesc->SetName(fromSequence.name());
if (fromSequence.has_min_value()) {
seqDesc->SetMinValue(fromSequence.min_value());
}
if (fromSequence.has_max_value()) {
seqDesc->SetMaxValue(fromSequence.max_value());
}
if (fromSequence.has_start_value()) {
seqDesc->SetStartValue(fromSequence.start_value());
}
if (fromSequence.has_cache()) {
seqDesc->SetCache(fromSequence.cache());
}
if (fromSequence.has_increment()) {
seqDesc->SetIncrement(fromSequence.increment());
}
if (fromSequence.has_cycle()) {
seqDesc->SetCycle(fromSequence.cycle());
}

break;
}
case Ydb::Table::ColumnMeta::kFromLiteral: {
break;
}
default: break;
}
}

return propose;
}

Expand Down
17 changes: 17 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,23 @@ const TPath::TChecker& TPath::TChecker::NotChildren(EStatus status) const {
<< ", children: " << childrenCount);
}

const TPath::TChecker& TPath::TChecker::CanBackupTable(EStatus status) const {
if (Failed) {
return *this;
}

for (const auto& child: Path.Base()->GetChildren()) {
auto name = child.first;

TPath childPath = Path.Child(name);
if (childPath->IsTableIndex()) {
return Fail(status, TStringBuilder() << "path has indexes, request doesn't accept it");
}
}

return *this;
}

const TPath::TChecker& TPath::TChecker::NotDeleted(EStatus status) const {
if (Failed) {
return *this;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class TPath {
const TChecker& ShardsLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const;
const TChecker& PathShardsLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const;
const TChecker& NotChildren(EStatus status = EStatus::StatusInvalidParameter) const;
const TChecker& CanBackupTable(EStatus status = EStatus::StatusInvalidParameter) const;
const TChecker& IsValidACL(const TString& acl, EStatus status = EStatus::StatusInvalidParameter) const;
const TChecker& PQPartitionsLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const;
const TChecker& PQReservedStorageLimit(ui64 delta = 1, EStatus status = EStatus::StatusResourceExhausted) const;
Expand Down
Loading

0 comments on commit 2b4191b

Please sign in to comment.