Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented schema versions normalizer #9627

Merged
merged 17 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
if (LastSavedNormalizerId && (ui32)nType <= *LastSavedNormalizerId) {
continue;
}
if (nType == ENormalizerSequentialId::MAX) {
if (nType == ENormalizerSequentialId::MAX) {
aavdonkin marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx)));
Expand Down
139 changes: 139 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/version.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#include "version.h"

namespace NKikimr::NOlap {

class TSchemaVersionNormalizer::TNormalizerResult : public INormalizerChanges {
private:
class TKey {
public:
ui64 Step;
ui64 TxId;
ui64 Version;
ui32 Id;

public:
TKey() = default;

TKey(ui32 id, ui64 step, ui64 txId, ui64 version)
: Step(step)
, TxId(txId)
, Version(version)
, Id(id)
{
}
};

std::vector<TKey> VersionsToRemove;

public:
TNormalizerResult(std::vector<TKey>&& versions)
: VersionsToRemove(versions)
{
}

bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (auto& key: VersionsToRemove) {
db.Table<Schema::SchemaPresetVersionInfo>().Key(key.Id, key.Step, key.TxId).Delete();
}
return true;
}

ui64 GetSize() const override {
return VersionsToRemove.size();
}

static std::optional<std::vector<INormalizerChanges::TPtr>> Init(NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
THashSet<ui64> usedSchemaVersions;
NIceDb::TNiceDb db(txc.DB);
{
auto rowset = db.Table<Schema::IndexPortions>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
usedSchemaVersions.insert(rowset.GetValue<Schema::IndexPortions::SchemaVersion>());
if (!rowset.Next()) {
return std::nullopt;
}
}
} else {
return std::nullopt;
}
}
{
auto rowset = db.Table<Schema::InsertTable>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
if (rowset.HaveValue<Schema::InsertTable::SchemaVersion>()) {
usedSchemaVersions.insert(rowset.GetValue<Schema::InsertTable::SchemaVersion>());
if (!rowset.Next()) {
return std::nullopt;
}
}
}
} else {
return std::nullopt;
}
}

std::vector<TKey> unusedSchemaIds;
std::optional<ui64> maxVersion;
std::vector<INormalizerChanges::TPtr> changes;

{
auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
if (rowset.IsReady()) {
while (!rowset.EndOfSet()) {
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
if (info.HasSchema()) {
ui64 version = info.GetSchema().GetVersion();
if (!maxVersion.has_value() || (version > *maxVersion)) {
maxVersion = version;
}
if (!usedSchemaVersions.contains(version)) {
unusedSchemaIds.emplace_back(id, rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>(), version);
}
}

if (!rowset.Next()) {
return std::nullopt;
}
}
} else {
return std::nullopt;
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вот тут уже известен maxVersion и его можно удалить из unused

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну это же лишняя операция будет
Это же вектор, его нужно полностью обойти, а он и так будет обходиться в момент удаления

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а что мы оптимизируем?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Время работы

std::vector<TKey> portion;
portion.reserve(10000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

кажется, что это можно убрать

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему?
Там как раз до 10000 элементов может добавиться

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Или отсюда убрать или после move'a тоже добавить. Сейчас как-то несимметрично. Для первого батча делается reserve, для всех последующих - нет.

for (const auto& id: unusedSchemaIds) {
if (!maxVersion.has_value() || (id.Version != *maxVersion)) {
portion.push_back(id);
if (portion.size() >= 10000) {
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
}
}
}
if (portion.size() > 0) {
changes.emplace_back(std::make_shared<TNormalizerResult>(std::move(portion)));
}
return changes;
}
};

TConclusion<std::vector<INormalizerTask::TPtr>> TSchemaVersionNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
auto changes = TNormalizerResult::Init(txc);
if (!changes) {
return TConclusionStatus::Fail("Not ready");;
}
std::vector<INormalizerTask::TPtr> tasks;
for (auto&& c : *changes) {
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c));
}
return tasks;
}

}
42 changes: 42 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/version.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/defs.h>


namespace NKikimr::NColumnShard {
class TTablesManager;
}

namespace NKikimr::NOlap {

class TSchemaVersionNormalizer : public TNormalizationController::INormalizerComponent {
public:
static TString GetClassNameStatic() {
return "SchemaVersionCleaner";
}

private:
static inline TFactory::TRegistrator<TSchemaVersionNormalizer> Registrator = TFactory::TRegistrator<TSchemaVersionNormalizer>(
GetClassNameStatic());
public:
class TNormalizerResult;
class TTask;

public:
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return std::nullopt;
}

virtual TString GetClassName() const override {
return GetClassNameStatic();
}

TSchemaVersionNormalizer(const TNormalizationController::TInitContext&) {
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/normalizer/schema_version/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

SRCS(
GLOBAL version.cpp
)

PEERDIR(
ydb/core/tx/columnshard/normalizer/abstract
)

END()
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ PEERDIR(
ydb/core/tx/columnshard/normalizer/tables
ydb/core/tx/columnshard/normalizer/portion
ydb/core/tx/columnshard/normalizer/insert_table
ydb/core/tx/columnshard/normalizer/schema_version
)

END()
40 changes: 39 additions & 1 deletion ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,36 @@ class TColumnChunksCleaner: public NYDBTest::ILocalDBModifier {
}
};

class TPortionsCleaner: public NYDBTest::ILocalDBModifier {
class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
auto rowset = db.Table<Schema::IndexPortions>().Select();
UNIT_ASSERT(rowset.IsReady());

ui64 minVersion = (ui64)-1;
while (!rowset.EndOfSet()) {
auto version = rowset.GetValue<Schema::IndexPortions::SchemaVersion>();
if (version < minVersion) {
minVersion = version;
}
UNIT_ASSERT(rowset.Next());
}

// Add invalid widow schema, if SchemaVersionCleaner will not erase it, then test will fail
TString serialized;
NKikimrTxColumnShard::TSchemaPresetVersionInfo info;
info.MutableSchema()->SetVersion(minVersion - 1);
Y_ABORT_UNLESS(info.SerializeToString(&serialized));
db.Table<Schema::SchemaPresetVersionInfo>().Key(11, 1, 1).Update(NIceDb::TUpdate<Schema::SchemaPresetVersionInfo::InfoProto>(serialized));

db.Table<Schema::SchemaPresetInfo>().Key(10).Update(NIceDb::TUpdate<Schema::SchemaPresetInfo::Name>("default"));

}
};

class TPortionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
Expand Down Expand Up @@ -259,6 +288,10 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);

auto* repair = runtime.GetAppData().ColumnShardConfig.MutableRepairs()->Add();
repair->SetClassName("SchemaVersionCleaner");
repair->SetDescription("Removing unused schema versions");

const ui64 tableId = 1;
const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)),
NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) };
Expand Down Expand Up @@ -308,10 +341,15 @@ Y_UNIT_TEST_SUITE(Normalizers) {
TestNormalizerImpl<TPortionsCleaner>();
}

Y_UNIT_TEST(SchemaVersionsNormalizer) {
TestNormalizerImpl<TSchemaVersionsCleaner>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

локальные модификаторы были сделаны для моделирования ошибки в локальной базе. для случая, который у нас, можно, просто, поменять схему несколько раз. чтобы по ходу дела не удалялись схемы - через контроллер, просто, отключим, когда будет что отключать, и все.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тогда как проверить что нормалайзер отработал?
Сейчас я добавляю невалидную схему и если нормалайзер ее не удалит, то тест свалится с ошибкой

}

Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
TestNormalizerImpl<TEmptyPortionsCleaner>();
}


Y_UNIT_TEST(EmptyTablesNormalizer) {
class TLocalNormalizerChecker: public TNormalizerChecker {
public:
Expand Down
Loading