Skip to content

Commit

Permalink
Remove deleted last not deleted version from database when new versio…
Browse files Browse the repository at this point in the history
…n is added
  • Loading branch information
aavdonkin committed Sep 23, 2024
1 parent f1a7ac8 commit f2e6baf
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 28 deletions.
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class TCompactColumnEngineChanges;
class TInsertColumnEngineChanges;
class TStoragesManager;
class TDbWrapper;
class TColumnEngineForLogs;

namespace NReader {
class TTxScan;
Expand Down Expand Up @@ -278,6 +279,7 @@ class TColumnShard
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;

friend class NOlap::TDbWrapper;
friend class NOlap::TColumnEngineForLogs;

class TStoragesManager;
friend class TTxController;
Expand Down
28 changes: 14 additions & 14 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,29 @@
#include <concepts>

namespace NKikimr::NOlap {
/*
TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, storagesManager))
, StoragesManager(storagesManager)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0)
{
ActualizationController = std::make_shared<NActualizer::TController>();
}
*/
TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager,
const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema, NColumnShard::TColumnShard* cs)
const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema, NColumnShard::TColumnShard* cs, NIceDb::TNiceDb* database)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, storagesManager))
, StoragesManager(storagesManager)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0)
, CS(cs)
, Database(database)
{
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, schema);
}

TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager,
const TSnapshot& snapshot, TIndexInfo&& schema, NColumnShard::TColumnShard* cs)
const TSnapshot& snapshot, TIndexInfo&& schema, NColumnShard::TColumnShard* cs, NIceDb::TNiceDb* database)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, storagesManager))
, StoragesManager(storagesManager)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0)
, CS(cs) {
, CS(cs)
, Database(database) {
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, std::move(schema));
}
Expand Down Expand Up @@ -150,7 +141,16 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
switchOptimizer = !indexInfo.GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor());
}
const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization();
auto lastNotDeleted = VersionedIndex.LastNotDeletedVersion;
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
if (lastNotDeleted.has_value() && !VersionedIndex.LastNotDeletedVersion.has_value()) {
auto& versionToKey = CS->TablesManager.VersionToKey;
auto iter = versionToKey.find(*lastNotDeleted);
AFL_VERIFY(iter != versionToKey.end());
for (auto& key: iter->second) {
Database->Table<NKikimr::NColumnShard::Schema::SchemaPresetVersionInfo>().Key(key.Id, key.PlanStep, key.TxId).Delete();
}
}
if (isCriticalScheme) {
if (!ActualizationStarted) {
ActualizationStarted = true;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ class TColumnEngineForLogs : public IColumnEngine {
ADD,
};

TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema, NColumnShard::TColumnShard* cs);
TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema, NColumnShard::TColumnShard* cs);
// TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager);
TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema, NColumnShard::TColumnShard* cs, NIceDb::TNiceDb* database);
TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema, NColumnShard::TColumnShard* cs, NIceDb::TNiceDb* database);

virtual void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) override;

Expand Down Expand Up @@ -190,6 +189,7 @@ class TColumnEngineForLogs : public IColumnEngine {
ui64 LastPortion;
ui64 LastGranule;
NColumnShard::TColumnShard* CS = nullptr;
NIceDb::TNiceDb* Database;
TSnapshot LastSnapshot = TSnapshot::Zero();
bool Loaded = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInf
const bool needActualization = indexInfo.GetSchemeNeedActualization();
auto newVersion = indexInfo.GetVersion();
if (LastNotDeletedVersion.has_value() && (*LastNotDeletedVersion < newVersion)) {
LOG_S_CRIT("Removing schema version " << *LastNotDeletedVersion << " from memory, but not from db")
RemoveVersionNoCheck(*LastNotDeletedVersion);
LastNotDeletedVersion.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ class TVersionedIndex {
std::map<ui64, ISnapshotSchema::TPtr> SnapshotByVersion;
ui64 LastSchemaVersion = 0;
std::optional<ui64> SchemeVersionForActualization;
std::optional<ui64> LastNotDeletedVersion;
ISnapshotSchema::TPtr SchemeForActualization;

public:
std::optional<ui64> LastNotDeletedVersion;

public:
ISnapshotSchema::TPtr GetLastCriticalSchema() const {
return SchemeForActualization;
Expand Down
16 changes: 7 additions & 9 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
TSchemaPreset::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
auto& key = VersionToKey[info.GetSchema().GetVersion()];
key.PlanStep = version.GetPlanStep();
key.TxId = version.GetTxId();
key.Id = id;
key.emplace_back(id, version.GetPlanStep(), version.GetTxId());
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "load_preset")("preset_id", id)("snapshot", version)("version", info.HasSchema() ? info.GetSchema().GetVersion() : -1);
preset.AddVersion(version, info);
if (!rowset.Next()) {
Expand All @@ -182,7 +180,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
"version", schemaInfo.GetSchema().GetVersion());
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(
TabletId, StoragesManager, preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema(), CS);
TabletId, StoragesManager, preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema(), CS, &db);
} else {
PrimaryIndex->RegisterSchemaVersion(preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
}
Expand Down Expand Up @@ -285,7 +283,9 @@ void TTablesManager::RemoveUnusedSchemaVersion(NTable::TDatabase* database, ui64
removed = PrimaryIndex->RemoveSchemaVersion(version);
}
if (removed) {
db.Table<Schema::SchemaPresetVersionInfo>().Key(iter->second.Id, iter->second.PlanStep, iter->second.TxId).Delete();
for (auto& key: iter->second) {
db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.PlanStep, key.TxId).Delete();
}
}
}

Expand All @@ -301,11 +301,9 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho
Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo);
if (versionInfo.HasSchema()) {
auto& key = VersionToKey[versionInfo.GetSchema().GetVersion()];
key.Id = presetId;
key.PlanStep = version.GetPlanStep();
key.TxId = version.GetTxId();
key.emplace_back(presetId, version.GetPlanStep(), version.GetTxId());
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, StoragesManager, version, schema, CS);
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, StoragesManager, version, schema, CS, &db);
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ class TTablesManager {
ui64 PlanStep;
ui64 TxId;
ui32 Id;

public:
TSchemaKey() = default;

TSchemaKey(ui32 id, ui64 planStep, ui64 txId)
: PlanStep(planStep)
, TxId(txId)
, Id(id)
{
}
};

private:
Expand All @@ -159,7 +169,9 @@ class TTablesManager {
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
ui64 TabletId = 0;
NColumnShard::TColumnShard* CS = nullptr;
THashMap<ui64, TSchemaKey> VersionToKey;

public:
THashMap<ui64, TVector<TSchemaKey>> VersionToKey;

public:
TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId, NColumnShard::TColumnShard* cs);
Expand Down

0 comments on commit f2e6baf

Please sign in to comment.