From d7736154f0ea2663f24c6a70b7f2c2db12be607b Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Mon, 23 Sep 2024 11:11:34 +0000 Subject: [PATCH 01/17] Implemented schema versions normalizer --- .../normalizer/abstract/abstract.cpp | 5 +- .../normalizer/abstract/abstract.h | 1 + .../normalizer/portion/portion.cpp | 1 + .../columnshard/normalizer/portion/portion.h | 4 +- .../normalizer/version/version.cpp | 138 ++++++++++++++++++ .../columnshard/normalizer/version/version.h | 43 ++++++ .../tx/columnshard/normalizer/version/ya.make | 11 ++ ydb/core/tx/columnshard/normalizer/ya.make | 1 + 8 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 ydb/core/tx/columnshard/normalizer/version/version.cpp create mode 100644 ydb/core/tx/columnshard/normalizer/version/version.h create mode 100644 ydb/core/tx/columnshard/normalizer/version/ya.make diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index 75005013fbed..b42d481b2113 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -58,11 +58,14 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { Counters.clear(); Normalizers.clear(); if (HasAppData()) { + LOG_S_CRIT("Repairs size " << AppDataVerified().ColumnShardConfig.GetRepairs().size()); for (auto&& i : AppDataVerified().ColumnShardConfig.GetRepairs()) { AFL_VERIFY(i.GetDescription())("error", "repair normalization have to has unique description"); - if (FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) { + if (false) {//FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) { + LOG_S_CRIT("Repair " << i.GetClassName() << " finished"); AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "repair already processed")("description", i.GetDescription()); } else { + LOG_S_CRIT("Repair " << i.GetClassName() << " will be started"); auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(i.GetClassName(), ctx))); normalizer->SetIsRepair(true).SetUniqueDescription(i.GetDescription()); } diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index e75099ecd9ba..e1eb0e47885d 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -59,6 +59,7 @@ enum class ENormalizerSequentialId: ui32 { EmptyPortionsCleaner, CleanInsertionDedup, GCCountersNormalizer, + SchemaVersionCleaner, MAX }; diff --git a/ydb/core/tx/columnshard/normalizer/portion/portion.cpp b/ydb/core/tx/columnshard/normalizer/portion/portion.cpp index 739715f44125..9a7b9c617cc9 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/portion.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/portion.cpp @@ -44,6 +44,7 @@ INormalizerTask::TPtr TPortionsNormalizer::BuildTask(std::vector TPortionsNormalizer::DoInitImpl(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) { + LOG_S_CRIT("TPortionsNormalizer DoInitImpl called\n"); using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); diff --git a/ydb/core/tx/columnshard/normalizer/portion/portion.h b/ydb/core/tx/columnshard/normalizer/portion/portion.h index 5182c54bfbf5..d3d486c25669 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/portion.h +++ b/ydb/core/tx/columnshard/normalizer/portion/portion.h @@ -35,7 +35,9 @@ class TPortionsNormalizer : public TPortionsNormalizerBase { TPortionsNormalizer(const TNormalizationController::TInitContext& info) : TPortionsNormalizerBase(info) - {} + { + LOG_S_CRIT("TPortionsNormalizer created\n"); + } virtual INormalizerTask::TPtr BuildTask(std::vector>&& portions, std::shared_ptr> schemas) const override; virtual TConclusion DoInitImpl(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; diff --git a/ydb/core/tx/columnshard/normalizer/version/version.cpp b/ydb/core/tx/columnshard/normalizer/version/version.cpp new file mode 100644 index 000000000000..c85dc76dedb8 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/version/version.cpp @@ -0,0 +1,138 @@ +#include "version.h" + +namespace NKikimr::NOlap { + +class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { +private: + class TKey { + public: + ui64 Step; + ui64 TxId; + ui64 Version; + ui32 Id; + + public: + TKey() = default; + + TKey(ui32 id, ui64 step, ui64 txId, ui64 version) + : Step(step) + , TxId(txId) + , Version(version) + , Id(id) + { + } + }; + + TVector VersionsToRemove; + std::optional LastVersion; + +public: + TNormalizerResult(TVector&& versions, std::optional& lastVersion) + : VersionsToRemove(versions) + , LastVersion(lastVersion) + { + } + + bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override { + LOG_S_CRIT("TSchemaVersionNormalizer::TNormalizerResult::ApplyOnExecute called"); + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + for (auto& key: VersionsToRemove) { + if ((!LastVersion.has_value()) || (key.Version != *LastVersion)) { + db.Table().Key(key.Id, key.Step, key.TxId).Delete(); + } + } + return true; + } + + ui64 GetSize() const override { + return VersionsToRemove.size(); + } + + static std::optional> Init(NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + THashSet usedSchemaVersions; + NIceDb::TNiceDb db(txc.DB); + { + auto rowset = db.Table().Select(); + if (rowset.IsReady()) { + while (!rowset.EndOfSet()) { + usedSchemaVersions.insert(rowset.GetValue()); + if (!rowset.Next()) { + break; + } + } + } + } + { + auto rowset = db.Table().Select(); + if (rowset.IsReady()) { + while (!rowset.EndOfSet()) { + if (rowset.HaveValue()) { + usedSchemaVersions.insert(rowset.GetValue()); + if (!rowset.Next()) { + break; + } + } + } + } + } + + for (ui64 version: usedSchemaVersions) { + LOG_S_CRIT("Used schema version " << version); + } + + TVector unusedSchemaIds; + std::optional maxVersion = 0; + + { + bool emptyUsed = usedSchemaVersions.size() == 0; + auto rowset = db.Table().Select(); + if (rowset.IsReady()) { + while (!rowset.EndOfSet()) { + const ui32 id = rowset.GetValue(); + NKikimrTxColumnShard::TSchemaPresetVersionInfo info; + Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue())); + if (info.HasSchema()) { + ui64 version = info.GetSchema().GetVersion(); + LOG_S_CRIT("Got schema version " << version); + if (emptyUsed && ((!maxVersion.has_value()) || (version > maxVersion))) { + maxVersion = version; + } + if (!usedSchemaVersions.contains(version)) { + unusedSchemaIds.emplace_back(id, rowset.GetValue(), rowset.GetValue(), version); + } + } + + if (!rowset.Next()) { + break; + } + } + } + } + + LOG_S_CRIT("Unused schema count " << unusedSchemaIds.size()); + if (unusedSchemaIds.size() > 0) { + std::vector changes; + changes.emplace_back(std::make_shared(std::move(unusedSchemaIds), maxVersion)); + return changes; + } else { + return std::nullopt; + } + } +}; + +TConclusion> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) { + LOG_S_CRIT("SchemaVersionNormalizer::DoInit called"); + auto changes = TNormalizerResult::Init(txc); + if (!changes) { + return TConclusionStatus::Fail("Not ready");; + } + std::vector tasks; + for (auto&& c : *changes) { + tasks.emplace_back(std::make_shared(c)); + } + return tasks; +} + +} diff --git a/ydb/core/tx/columnshard/normalizer/version/version.h b/ydb/core/tx/columnshard/normalizer/version/version.h new file mode 100644 index 000000000000..bb0bfc59c576 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/version/version.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include +#include + + +namespace NKikimr::NColumnShard { + class TTablesManager; +} + +namespace NKikimr::NOlap { + +class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent { +public: + static TString GetClassNameStatic() { + return ::ToString(ENormalizerSequentialId::SchemaVersionCleaner); + } + +private: + static inline TFactory::TRegistrator Registrator = TFactory::TRegistrator( + GetClassNameStatic()); +public: + class TNormalizerResult; + class TTask; + +public: + virtual std::optional DoGetEnumSequentialId() const override { + return ENormalizerSequentialId::SchemaVersionCleaner; + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) { + LOG_S_CRIT("SchemaVersionNormalizer created\n"); + } + + virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; +}; + +} diff --git a/ydb/core/tx/columnshard/normalizer/version/ya.make b/ydb/core/tx/columnshard/normalizer/version/ya.make new file mode 100644 index 000000000000..d0412bcdd927 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/version/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + GLOBAL version.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/normalizer/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/normalizer/ya.make b/ydb/core/tx/columnshard/normalizer/ya.make index ced78fd812af..450b27507d6c 100644 --- a/ydb/core/tx/columnshard/normalizer/ya.make +++ b/ydb/core/tx/columnshard/normalizer/ya.make @@ -7,6 +7,7 @@ PEERDIR( ydb/core/tx/columnshard/normalizer/tables ydb/core/tx/columnshard/normalizer/portion ydb/core/tx/columnshard/normalizer/insert_table + ydb/core/tx/columnshard/normalizer/version ) END() From b6d89d38562217a33f24d44437bd82aa80d41737 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Mon, 23 Sep 2024 11:16:21 +0000 Subject: [PATCH 02/17] Removed logging for debug --- ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp | 5 +---- ydb/core/tx/columnshard/normalizer/portion/portion.cpp | 1 - ydb/core/tx/columnshard/normalizer/portion/portion.h | 4 +--- ydb/core/tx/columnshard/normalizer/version/version.cpp | 8 -------- ydb/core/tx/columnshard/normalizer/version/version.h | 1 - 5 files changed, 2 insertions(+), 17 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index b42d481b2113..75005013fbed 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -58,14 +58,11 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { Counters.clear(); Normalizers.clear(); if (HasAppData()) { - LOG_S_CRIT("Repairs size " << AppDataVerified().ColumnShardConfig.GetRepairs().size()); for (auto&& i : AppDataVerified().ColumnShardConfig.GetRepairs()) { AFL_VERIFY(i.GetDescription())("error", "repair normalization have to has unique description"); - if (false) {//FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) { - LOG_S_CRIT("Repair " << i.GetClassName() << " finished"); + if (FinishedNormalizers.contains(TNormalizerFullId(i.GetClassName(), i.GetDescription()))) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "repair already processed")("description", i.GetDescription()); } else { - LOG_S_CRIT("Repair " << i.GetClassName() << " will be started"); auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(i.GetClassName(), ctx))); normalizer->SetIsRepair(true).SetUniqueDescription(i.GetDescription()); } diff --git a/ydb/core/tx/columnshard/normalizer/portion/portion.cpp b/ydb/core/tx/columnshard/normalizer/portion/portion.cpp index 9a7b9c617cc9..739715f44125 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/portion.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/portion.cpp @@ -44,7 +44,6 @@ INormalizerTask::TPtr TPortionsNormalizer::BuildTask(std::vector TPortionsNormalizer::DoInitImpl(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) { - LOG_S_CRIT("TPortionsNormalizer DoInitImpl called\n"); using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); diff --git a/ydb/core/tx/columnshard/normalizer/portion/portion.h b/ydb/core/tx/columnshard/normalizer/portion/portion.h index d3d486c25669..5182c54bfbf5 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/portion.h +++ b/ydb/core/tx/columnshard/normalizer/portion/portion.h @@ -35,9 +35,7 @@ class TPortionsNormalizer : public TPortionsNormalizerBase { TPortionsNormalizer(const TNormalizationController::TInitContext& info) : TPortionsNormalizerBase(info) - { - LOG_S_CRIT("TPortionsNormalizer created\n"); - } + {} virtual INormalizerTask::TPtr BuildTask(std::vector>&& portions, std::shared_ptr> schemas) const override; virtual TConclusion DoInitImpl(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; diff --git a/ydb/core/tx/columnshard/normalizer/version/version.cpp b/ydb/core/tx/columnshard/normalizer/version/version.cpp index c85dc76dedb8..568a69f0ada9 100644 --- a/ydb/core/tx/columnshard/normalizer/version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/version/version.cpp @@ -34,7 +34,6 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override { - LOG_S_CRIT("TSchemaVersionNormalizer::TNormalizerResult::ApplyOnExecute called"); using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); for (auto& key: VersionsToRemove) { @@ -78,10 +77,6 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } } - for (ui64 version: usedSchemaVersions) { - LOG_S_CRIT("Used schema version " << version); - } - TVector unusedSchemaIds; std::optional maxVersion = 0; @@ -95,7 +90,6 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue())); if (info.HasSchema()) { ui64 version = info.GetSchema().GetVersion(); - LOG_S_CRIT("Got schema version " << version); if (emptyUsed && ((!maxVersion.has_value()) || (version > maxVersion))) { maxVersion = version; } @@ -111,7 +105,6 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } } - LOG_S_CRIT("Unused schema count " << unusedSchemaIds.size()); if (unusedSchemaIds.size() > 0) { std::vector changes; changes.emplace_back(std::make_shared(std::move(unusedSchemaIds), maxVersion)); @@ -123,7 +116,6 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { }; TConclusion> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) { - LOG_S_CRIT("SchemaVersionNormalizer::DoInit called"); auto changes = TNormalizerResult::Init(txc); if (!changes) { return TConclusionStatus::Fail("Not ready");; diff --git a/ydb/core/tx/columnshard/normalizer/version/version.h b/ydb/core/tx/columnshard/normalizer/version/version.h index bb0bfc59c576..3befed6b309b 100644 --- a/ydb/core/tx/columnshard/normalizer/version/version.h +++ b/ydb/core/tx/columnshard/normalizer/version/version.h @@ -34,7 +34,6 @@ class TSchemaVersionNormalizer : public TNormalizationController::INormalizerCom } TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) { - LOG_S_CRIT("SchemaVersionNormalizer created\n"); } virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; From da40fec50e5532da47e26aaf470e8a70338fcddd Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Mon, 23 Sep 2024 11:20:31 +0000 Subject: [PATCH 03/17] Fixed indentation --- ydb/core/tx/columnshard/normalizer/version/version.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/normalizer/version/version.cpp b/ydb/core/tx/columnshard/normalizer/version/version.cpp index 568a69f0ada9..b621033128c7 100644 --- a/ydb/core/tx/columnshard/normalizer/version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/version/version.cpp @@ -59,7 +59,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { usedSchemaVersions.insert(rowset.GetValue()); if (!rowset.Next()) { break; - } + } } } } From c5f010518ebcad29444a83763d44bb30db4a0798 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Mon, 23 Sep 2024 11:42:30 +0000 Subject: [PATCH 04/17] Fixed wrong syntax after conflict resolution --- ydb/core/tx/columnshard/normalizer/ya.make | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ydb/core/tx/columnshard/normalizer/ya.make b/ydb/core/tx/columnshard/normalizer/ya.make index 450b27507d6c..94c173dedf0c 100644 --- a/ydb/core/tx/columnshard/normalizer/ya.make +++ b/ydb/core/tx/columnshard/normalizer/ya.make @@ -8,6 +8,8 @@ PEERDIR( ydb/core/tx/columnshard/normalizer/portion ydb/core/tx/columnshard/normalizer/insert_table ydb/core/tx/columnshard/normalizer/version + ydb/core/tx/columnshard/normalizer/insert_table +>>>>>>> Fixed wrong syntax after conflict resolution ) END() From 45580f7835b16f50a963d9f816569b1edbe36177 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Mon, 23 Sep 2024 14:07:12 +0000 Subject: [PATCH 05/17] Fixed local db error handling --- .../columnshard/normalizer/version/version.cpp | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/version/version.cpp b/ydb/core/tx/columnshard/normalizer/version/version.cpp index b621033128c7..487a28391247 100644 --- a/ydb/core/tx/columnshard/normalizer/version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/version/version.cpp @@ -58,7 +58,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { while (!rowset.EndOfSet()) { usedSchemaVersions.insert(rowset.GetValue()); if (!rowset.Next()) { - break; + return std::nullopt; } } } @@ -70,7 +70,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { if (rowset.HaveValue()) { usedSchemaVersions.insert(rowset.GetValue()); if (!rowset.Next()) { - break; + return std::nullopt; } } } @@ -99,19 +99,15 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } if (!rowset.Next()) { - break; + return std::nullopt; } } } } - if (unusedSchemaIds.size() > 0) { - std::vector changes; - changes.emplace_back(std::make_shared(std::move(unusedSchemaIds), maxVersion)); - return changes; - } else { - return std::nullopt; - } + std::vector changes; + changes.emplace_back(std::make_shared(std::move(unusedSchemaIds), maxVersion)); + return changes; } }; From 5a86c1df360b53ccc0610802e344389ddd368a53 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 24 Sep 2024 07:46:11 +0000 Subject: [PATCH 06/17] Split big task --- .../normalizer/version/version.cpp | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/version/version.cpp b/ydb/core/tx/columnshard/normalizer/version/version.cpp index 487a28391247..dc802eab06f4 100644 --- a/ydb/core/tx/columnshard/normalizer/version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/version/version.cpp @@ -23,11 +23,11 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } }; - TVector VersionsToRemove; + std::vector VersionsToRemove; std::optional LastVersion; public: - TNormalizerResult(TVector&& versions, std::optional& lastVersion) + TNormalizerResult(std::vector&& versions, std::optional& lastVersion) : VersionsToRemove(versions) , LastVersion(lastVersion) { @@ -61,6 +61,8 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { return std::nullopt; } } + } else { + return std::nullopt; } } { @@ -74,11 +76,14 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } } } + } else { + return std::nullopt; } } - TVector unusedSchemaIds; - std::optional maxVersion = 0; + std::vector unusedSchemaIds; + std::optional maxVersion; + std::vector changes; { bool emptyUsed = usedSchemaVersions.size() == 0; @@ -102,11 +107,17 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { return std::nullopt; } } + } else { + return std::nullopt; } } - std::vector changes; - changes.emplace_back(std::make_shared(std::move(unusedSchemaIds), maxVersion)); + for (size_t start = 0; start < unusedSchemaIds.size(); start += 10000) { + std::vector portion; + size_t end = std::min(unusedSchemaIds.size(), start + 10000); + portion.insert(portion.begin(), &unusedSchemaIds[start], &unusedSchemaIds[end]); + changes.emplace_back(std::make_shared(std::move(portion), maxVersion)); + } return changes; } }; From a7d99c636f0dbf4a6d9fe0cb9b233482b66df8a0 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 24 Sep 2024 08:12:57 +0000 Subject: [PATCH 07/17] Renamed version to schema_version --- .../normalizer/{version => schema_version}/version.cpp | 0 .../normalizer/{version => schema_version}/version.h | 0 .../normalizer/{version => schema_version}/ya.make | 0 ydb/core/tx/columnshard/normalizer/ya.make | 4 +--- 4 files changed, 1 insertion(+), 3 deletions(-) rename ydb/core/tx/columnshard/normalizer/{version => schema_version}/version.cpp (100%) rename ydb/core/tx/columnshard/normalizer/{version => schema_version}/version.h (100%) rename ydb/core/tx/columnshard/normalizer/{version => schema_version}/ya.make (100%) diff --git a/ydb/core/tx/columnshard/normalizer/version/version.cpp b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp similarity index 100% rename from ydb/core/tx/columnshard/normalizer/version/version.cpp rename to ydb/core/tx/columnshard/normalizer/schema_version/version.cpp diff --git a/ydb/core/tx/columnshard/normalizer/version/version.h b/ydb/core/tx/columnshard/normalizer/schema_version/version.h similarity index 100% rename from ydb/core/tx/columnshard/normalizer/version/version.h rename to ydb/core/tx/columnshard/normalizer/schema_version/version.h diff --git a/ydb/core/tx/columnshard/normalizer/version/ya.make b/ydb/core/tx/columnshard/normalizer/schema_version/ya.make similarity index 100% rename from ydb/core/tx/columnshard/normalizer/version/ya.make rename to ydb/core/tx/columnshard/normalizer/schema_version/ya.make diff --git a/ydb/core/tx/columnshard/normalizer/ya.make b/ydb/core/tx/columnshard/normalizer/ya.make index 94c173dedf0c..f48c3308ac38 100644 --- a/ydb/core/tx/columnshard/normalizer/ya.make +++ b/ydb/core/tx/columnshard/normalizer/ya.make @@ -7,9 +7,7 @@ PEERDIR( ydb/core/tx/columnshard/normalizer/tables ydb/core/tx/columnshard/normalizer/portion ydb/core/tx/columnshard/normalizer/insert_table - ydb/core/tx/columnshard/normalizer/version - ydb/core/tx/columnshard/normalizer/insert_table ->>>>>>> Fixed wrong syntax after conflict resolution + ydb/core/tx/columnshard/normalizer/schema_version ) END() From 66c746ce912e46e2a98841e590b2c78f7aed491b Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 24 Sep 2024 11:31:31 +0000 Subject: [PATCH 08/17] Always leave last schema version even if it is absent in portions --- ydb/core/tx/columnshard/normalizer/schema_version/version.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp index dc802eab06f4..d7d2ca86ad15 100644 --- a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp @@ -86,7 +86,6 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { std::vector changes; { - bool emptyUsed = usedSchemaVersions.size() == 0; auto rowset = db.Table().Select(); if (rowset.IsReady()) { while (!rowset.EndOfSet()) { @@ -95,7 +94,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue())); if (info.HasSchema()) { ui64 version = info.GetSchema().GetVersion(); - if (emptyUsed && ((!maxVersion.has_value()) || (version > maxVersion))) { + if (!maxVersion.has_value() || (version > *maxVersion)) { maxVersion = version; } if (!usedSchemaVersions.contains(version)) { From 56ba516c330835a4a28234a69e0bbe4cb56fafe0 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 24 Sep 2024 12:01:14 +0000 Subject: [PATCH 09/17] Rewritten portions split --- .../normalizer/schema_version/version.cpp | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp index d7d2ca86ad15..56a85e2e09db 100644 --- a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp @@ -24,12 +24,10 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { }; std::vector VersionsToRemove; - std::optional LastVersion; public: - TNormalizerResult(std::vector&& versions, std::optional& lastVersion) + TNormalizerResult(std::vector&& versions) : VersionsToRemove(versions) - , LastVersion(lastVersion) { } @@ -37,9 +35,7 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); for (auto& key: VersionsToRemove) { - if ((!LastVersion.has_value()) || (key.Version != *LastVersion)) { - db.Table().Key(key.Id, key.Step, key.TxId).Delete(); - } + db.Table().Key(key.Id, key.Step, key.TxId).Delete(); } return true; } @@ -111,11 +107,18 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { } } - for (size_t start = 0; start < unusedSchemaIds.size(); start += 10000) { - std::vector portion; - size_t end = std::min(unusedSchemaIds.size(), start + 10000); - portion.insert(portion.begin(), &unusedSchemaIds[start], &unusedSchemaIds[end]); - changes.emplace_back(std::make_shared(std::move(portion), maxVersion)); + std::vector portion; + portion.reserve(10000); + for (auto iter = unusedSchemaIds.begin(); iter != unusedSchemaIds.end(); iter++) { + if (!maxVersion.has_value() || (iter->Version != *maxVersion)) { + portion.push_back(*iter); + if (portion.size() >= 10000) { + changes.emplace_back(std::make_shared(std::move(portion))); + } + } + } + if (portion.size() > 0) { + changes.emplace_back(std::make_shared(std::move(portion))); } return changes; } From eb31a54f00c994290a3c4feb143d9eceee69b35c Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 24 Sep 2024 13:35:50 +0000 Subject: [PATCH 10/17] Run SchemaVersionCleaner only in repair mode --- ydb/core/tx/columnshard/normalizer/abstract/abstract.h | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index e1eb0e47885d..e75099ecd9ba 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -59,7 +59,6 @@ enum class ENormalizerSequentialId: ui32 { EmptyPortionsCleaner, CleanInsertionDedup, GCCountersNormalizer, - SchemaVersionCleaner, MAX }; From 510cddc56eb5a3c9663c6fc6f0f8f279293b1783 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 24 Sep 2024 14:23:09 +0000 Subject: [PATCH 11/17] Run SchemaVersionCleaner only in repair mode --- ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp | 2 +- ydb/core/tx/columnshard/normalizer/abstract/abstract.h | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index 75005013fbed..9260ad5fa9b0 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -80,7 +80,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { if (LastSavedNormalizerId && (ui32)nType <= *LastSavedNormalizerId) { continue; } - if (nType == ENormalizerSequentialId::MAX) { + if (nType == ENormalizerSequentialId::MAX) || (nType == ENormalizerSequentialId::SchemaVersionCleaner) { continue; } auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx))); diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index e75099ecd9ba..e1eb0e47885d 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -59,6 +59,7 @@ enum class ENormalizerSequentialId: ui32 { EmptyPortionsCleaner, CleanInsertionDedup, GCCountersNormalizer, + SchemaVersionCleaner, MAX }; From 552c7302f448370d90832200abafd224a7f85a10 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 24 Sep 2024 14:24:36 +0000 Subject: [PATCH 12/17] Fixed syntax --- ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index 9260ad5fa9b0..85a02716264a 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -80,7 +80,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { if (LastSavedNormalizerId && (ui32)nType <= *LastSavedNormalizerId) { continue; } - if (nType == ENormalizerSequentialId::MAX) || (nType == ENormalizerSequentialId::SchemaVersionCleaner) { + if ((nType == ENormalizerSequentialId::MAX) || (nType == ENormalizerSequentialId::SchemaVersionCleaner)) { continue; } auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx))); From 47aebaa6d559b05042ea21d99b3785c343e76dbf Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Tue, 24 Sep 2024 15:54:48 +0000 Subject: [PATCH 13/17] Use range for --- .../tx/columnshard/normalizer/schema_version/version.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp index 56a85e2e09db..cb214d7505af 100644 --- a/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp +++ b/ydb/core/tx/columnshard/normalizer/schema_version/version.cpp @@ -109,9 +109,9 @@ class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges { std::vector portion; portion.reserve(10000); - for (auto iter = unusedSchemaIds.begin(); iter != unusedSchemaIds.end(); iter++) { - if (!maxVersion.has_value() || (iter->Version != *maxVersion)) { - portion.push_back(*iter); + for (const auto& id: unusedSchemaIds) { + if (!maxVersion.has_value() || (id.Version != *maxVersion)) { + portion.push_back(id); if (portion.size() >= 10000) { changes.emplace_back(std::make_shared(std::move(portion))); } From 3fe401c987a80be263549abbcd9eeed603ee686b Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Wed, 25 Sep 2024 08:50:21 +0000 Subject: [PATCH 14/17] Removed SchemaVersionNormalizer from enum --- ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp | 2 +- ydb/core/tx/columnshard/normalizer/abstract/abstract.h | 1 - ydb/core/tx/columnshard/normalizer/schema_version/version.h | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index 85a02716264a..ee41ab02e3b5 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -80,7 +80,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { if (LastSavedNormalizerId && (ui32)nType <= *LastSavedNormalizerId) { continue; } - if ((nType == ENormalizerSequentialId::MAX) || (nType == ENormalizerSequentialId::SchemaVersionCleaner)) { + if (nType == ENormalizerSequentialId::MAX) { continue; } auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx))); diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index e1eb0e47885d..e75099ecd9ba 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -59,7 +59,6 @@ enum class ENormalizerSequentialId: ui32 { EmptyPortionsCleaner, CleanInsertionDedup, GCCountersNormalizer, - SchemaVersionCleaner, MAX }; diff --git a/ydb/core/tx/columnshard/normalizer/schema_version/version.h b/ydb/core/tx/columnshard/normalizer/schema_version/version.h index 3befed6b309b..48c8d0b4ea15 100644 --- a/ydb/core/tx/columnshard/normalizer/schema_version/version.h +++ b/ydb/core/tx/columnshard/normalizer/schema_version/version.h @@ -14,7 +14,7 @@ namespace NKikimr::NOlap { class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent { public: static TString GetClassNameStatic() { - return ::ToString(ENormalizerSequentialId::SchemaVersionCleaner); + return "SchemaVersionCleaner"; } private: @@ -26,7 +26,7 @@ class TSchemaVersionNormalizer : public TNormalizationController::INormalizerCom public: virtual std::optional DoGetEnumSequentialId() const override { - return ENormalizerSequentialId::SchemaVersionCleaner; + return std::nullopt; } virtual TString GetClassName() const override { From cbfe631c9dbf00e5aa8a3bf226320cdcebd96873 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Wed, 25 Sep 2024 11:49:44 +0000 Subject: [PATCH 15/17] Added test --- .../tx/columnshard/ut_rw/ut_normalizer.cpp | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 1f2312b1bfd6..6b20f4b00e60 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -152,7 +152,36 @@ class TColumnChunksCleaner: public NYDBTest::ILocalDBModifier { } }; -class TPortionsCleaner: public NYDBTest::ILocalDBModifier { +class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier { +public: + virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + auto rowset = db.Table().Select(); + UNIT_ASSERT(rowset.IsReady()); + + ui64 minVersion = (ui64)-1; + while (!rowset.EndOfSet()) { + auto version = rowset.GetValue(); + if (version < minVersion) { + minVersion = version; + } + UNIT_ASSERT(rowset.Next()); + } + + // Add invalid widow schema, if SchemaVersionCleaner will not erase it, then test will fail + TString serialized; + NKikimrTxColumnShard::TSchemaPresetVersionInfo info; + info.MutableSchema()->SetVersion(minVersion - 1); + Y_ABORT_UNLESS(info.SerializeToString(&serialized)); + db.Table().Key(11, 1, 1).Update(NIceDb::TUpdate(serialized)); + + db.Table().Key(10).Update(NIceDb::TUpdate("default")); + + } +}; + +class TPortionsCleaner : public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { using namespace NColumnShard; @@ -259,6 +288,11 @@ Y_UNIT_TEST_SUITE(Normalizers) { TTestBasicRuntime runtime; TTester::Setup(runtime); + auto* repair = runtime.GetAppData().ColumnShardConfig.MutableRepairs()->Add(); + repair->SetClassName("SchemaVersionCleaner"); + repair->SetDescription("Removing unused schema versions"); + + const ui64 ownerId = 0; const ui64 tableId = 1; const std::vector schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) }; @@ -308,10 +342,15 @@ Y_UNIT_TEST_SUITE(Normalizers) { TestNormalizerImpl(); } + Y_UNIT_TEST(SchemaVersionsNormalizer) { + TestNormalizerImpl(); + } + Y_UNIT_TEST(CleanEmptyPortionsNormalizer) { TestNormalizerImpl(); } + Y_UNIT_TEST(EmptyTablesNormalizer) { class TLocalNormalizerChecker: public TNormalizerChecker { public: From a928a63b4b0fedde4d76e900f6276abc39af698e Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Wed, 25 Sep 2024 12:04:53 +0000 Subject: [PATCH 16/17] Fixed confict resolution --- ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 6b20f4b00e60..d941d548414c 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -292,7 +292,6 @@ Y_UNIT_TEST_SUITE(Normalizers) { repair->SetClassName("SchemaVersionCleaner"); repair->SetDescription("Removing unused schema versions"); - const ui64 ownerId = 0; const ui64 tableId = 1; const std::vector schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) }; From 4d16a50c677065c9d546aca51bbd0bc70e78dc05 Mon Sep 17 00:00:00 2001 From: Alexander Avdonkin Date: Thu, 26 Sep 2024 10:48:44 +0000 Subject: [PATCH 17/17] Fixed style --- ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index ee41ab02e3b5..75005013fbed 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -80,7 +80,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { if (LastSavedNormalizerId && (ui32)nType <= *LastSavedNormalizerId) { continue; } - if (nType == ENormalizerSequentialId::MAX) { + if (nType == ENormalizerSequentialId::MAX) { continue; } auto normalizer = RegisterNormalizer(std::shared_ptr(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx)));