Skip to content

Commit

Permalink
InsertTable schema version counts are maintained
Browse files Browse the repository at this point in the history
  • Loading branch information
aavdonkin committed Sep 13, 2024
1 parent 527f74c commit 1c1c001
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
}

bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
txc.Owner = Self;
TMemoryProfileGuard mpg("TTxWrite::Execute");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
Expand Down
35 changes: 19 additions & 16 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,26 +141,28 @@ class TInsertKey {
public:
ui64 PlanStep;
ui64 TxId;
ui64 PathId;
TString DedupId;
ui8 RecType;

public:
TInsertKey() = default;

TInsertKey(ui64 planStep, ui64 txId, const TString& dedupId, ui8 recType)
TInsertKey(ui64 planStep, ui64 txId, ui64 pathId, const TString& dedupId, ui8 recType)
: PlanStep(planStep)
, TxId(txId)
, PathId(pathId)
, DedupId(dedupId)
, RecType(recType)
{
}

bool operator==(const TInsertKey& other) const {
return (PlanStep == other.PlanStep) && (TxId == other.TxId) && (DedupId == other.DedupId) && (RecType == other.RecType);
return (PlanStep == other.PlanStep) && (TxId == other.TxId) && (PathId == other.PathId) && (DedupId == other.DedupId) && (RecType == other.RecType);
}

ui64 Hash() const {
return CombineHashes(PlanStep, CombineHashes(TxId, CombineHashes(hash<TString>()(DedupId), (ui64)RecType)));
return CombineHashes(PlanStep, CombineHashes(TxId, CombineHashes(PathId, CombineHashes(hash<TString>()(DedupId), (ui64)RecType))));
}
};

Expand All @@ -174,14 +176,15 @@ class TVersionCounts {

private:
THashMap<TInsertKey, ui64, Hasher> InsertVersions;
THashMap<ui64, ui64> PortionVersions;
THashMap<std::pair<ui64, ui64>, ui64> PortionVersions;
THashMap<ui64, ui32> VersionCounts;

public:
template<class Key, class Versions>
void VersionAddRef(Versions& versions, const Key& portion, ui64 version) {
ui64& curVer = versions[portion];
if (curVer != 0) {// Portion is already in the local database, no need to increase ref count
LOG_S_CRIT("Schema version is already written");
AFL_VERIFY(version == curVer);
return;
}
Expand All @@ -203,12 +206,12 @@ class TVersionCounts {
return --refCount;
}

void VersionAddRef(ui64 portion, ui64 version) {
VersionAddRef(PortionVersions, portion, version);
void VersionAddRef(ui64 portion, ui64 pathId, ui64 version) {
VersionAddRef(PortionVersions, std::pair<ui64, ui64>(portion, pathId), version);
}

ui32 VersionRemoveRef(ui64 portion, ui64 version) {
return VersionRemoveRef(PortionVersions, portion, version);
ui32 VersionRemoveRef(ui64 portion, ui64 pathId, ui64 version) {
return VersionRemoveRef(PortionVersions, std::pair<ui64, ui64>(portion, pathId), version);
}

void VersionAddRef(const TInsertKey& key, ui64 version) {
Expand Down Expand Up @@ -702,20 +705,20 @@ class TColumnShard

TColumnShard(TTabletStorageInfo* info, const TActorId& tablet);

void VersionAddRef(ui64 portion, ui64 version) {
VersionCounts.VersionAddRef(portion, version);
void VersionAddRef(ui64 portion, ui64 pathId, ui64 version) {
VersionCounts.VersionAddRef(portion, pathId, version);
}

ui32 VersionRemoveRef(ui64 portion, ui64 version) {
return VersionCounts.VersionRemoveRef(portion, version);
ui32 VersionRemoveRef(ui64 portion, ui64 pathId, ui64 version) {
return VersionCounts.VersionRemoveRef(portion, pathId, version);
}

void VersionAddRef(ui64 planStep, ui64 txId, const TString& dedupId, ui8 recType, ui64 version) {
VersionCounts.VersionAddRef(TInsertKey(planStep, txId, dedupId, recType), version);
void VersionAddRef(ui64 planStep, ui64 txId, ui64 pathId, const TString& dedupId, ui8 recType, ui64 version) {
VersionCounts.VersionAddRef(TInsertKey(planStep, txId, pathId, dedupId, recType), version);
}

ui32 VersionRemoveRef(ui64 planStep, ui64 txId, const TString& dedupId, ui8 recType, ui64 version) {
return VersionCounts.VersionRemoveRef(TInsertKey(planStep, txId, dedupId, recType), version);
ui32 VersionRemoveRef(ui64 planStep, ui64 txId, ui64 pathId, const TString& dedupId, ui8 recType, ui64 version) {
return VersionCounts.VersionRemoveRef(TInsertKey(planStep, txId, pathId, dedupId, recType), version);
}


Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo));
AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion)));
if (portion.HasSchemaVersion()) {
db.CS->VersionAddRef(portion.GetSchemaVersion(), 1);
db.CS->VersionAddRef(portion.GetPortionIdVerified(), portion.GetPathId(), portion.GetSchemaVersion());
}
})) {
return false;
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,37 @@ namespace NKikimr::NOlap {

void TDbWrapper::Insert(const TInsertedData& data) {
NIceDb::TNiceDb db(Database);
CS->VersionAddRef(0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "", (ui8)NKikimr::NColumnShard::Schema::EInsertTableIds::Inserted, data.GetSchemaVersion());
NColumnShard::Schema::InsertTable_Insert(db, data);
}

void TDbWrapper::Commit(const TCommittedData& data) {
NIceDb::TNiceDb db(Database);
CS->VersionAddRef(data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(), data.GetDedupId(), (ui8)NKikimr::NColumnShard::Schema::EInsertTableIds::Committed, data.GetSchemaVersion());
NColumnShard::Schema::InsertTable_Commit(db, data);
}

void TDbWrapper::Abort(const TInsertedData& data) {
NIceDb::TNiceDb db(Database);
CS->VersionAddRef(0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "", (ui8)NKikimr::NColumnShard::Schema::EInsertTableIds::Aborted, data.GetSchemaVersion());
NColumnShard::Schema::InsertTable_Abort(db, data);
}

void TDbWrapper::EraseInserted(const TInsertedData& data) {
NIceDb::TNiceDb db(Database);
CS->VersionRemoveRef(0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "", (ui8)NKikimr::NColumnShard::Schema::EInsertTableIds::Inserted, data.GetSchemaVersion());
NColumnShard::Schema::InsertTable_EraseInserted(db, data);
}

void TDbWrapper::EraseCommitted(const TCommittedData& data) {
NIceDb::TNiceDb db(Database);
CS->VersionRemoveRef(data.GetSnapshot().GetPlanStep(), data.GetSnapshot().GetTxId(), data.GetPathId(), data.GetDedupId(), (ui8)NKikimr::NColumnShard::Schema::EInsertTableIds::Committed, data.GetSchemaVersion());
NColumnShard::Schema::InsertTable_EraseCommitted(db, data);
}

void TDbWrapper::EraseAborted(const TInsertedData& data) {
NIceDb::TNiceDb db(Database);
CS->VersionRemoveRef(0, (ui64)data.GetInsertWriteId(), data.GetPathId(), "", (ui8)NKikimr::NColumnShard::Schema::EInsertTableIds::Aborted, data.GetSchemaVersion());
NColumnShard::Schema::InsertTable_EraseAborted(db, data);
}

Expand Down Expand Up @@ -71,7 +77,7 @@ void TDbWrapper::WritePortion(const NOlap::TPortionInfo& portion) {
auto metaProto = portion.GetMeta().SerializeToProto();
using IndexPortions = NColumnShard::Schema::IndexPortions;
auto removeSnapshot = portion.GetRemoveSnapshotOptional();
CS->VersionAddRef(portion.GetPortion(), portion.GetSchemaVersionVerified());
CS->VersionAddRef(portion.GetPortion(), portion.GetPathId(), portion.GetSchemaVersionVerified());
db.Table<IndexPortions>().Key(portion.GetPathId(), portion.GetPortion()).Update(
NIceDb::TUpdate<IndexPortions::SchemaVersion>(portion.GetSchemaVersionVerified()),
NIceDb::TUpdate<IndexPortions::ShardingVersion>(portion.GetShardingVersionDef(0)),
Expand All @@ -84,7 +90,7 @@ void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) {
LOG_S_CRIT("Erasing portion, schema version " << portion.GetSchemaVersionVerified() << " path id " << portion.GetPathId() << " portion id " << portion.GetPortion() << " database " << (ui64)&Database);
NIceDb::TNiceDb db(Database);
using IndexPortions = NColumnShard::Schema::IndexPortions;
ui32 refCount = CS->VersionRemoveRef(portion.GetPortion(), portion.GetSchemaVersionVerified());
ui32 refCount = CS->VersionRemoveRef(portion.GetPortion(), portion.GetPathId(), portion.GetSchemaVersionVerified());
if (refCount == 0) {
LOG_S_CRIT("Ref count is set to 0 for version " << portion.GetSchemaVersionVerified() << " need to delete");
}
Expand Down

0 comments on commit 1c1c001

Please sign in to comment.