Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintan schema versions reference count and remove unused from local database and memory #9134

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c361ddd
Put TColumnShard* object to TDBWrapper
aavdonkin Sep 12, 2024
edc66e1
Maintain version ref counts
aavdonkin Sep 12, 2024
42fcd94
Keep version counts in TColumnShard rather than in IColumnEngine
aavdonkin Sep 12, 2024
1d76717
Do not increase ref count if portion is already in the local db
aavdonkin Sep 12, 2024
ffc16cb
Moved TTransactionContext::Owner initialization to classes inherited …
aavdonkin Sep 13, 2024
dc8a1ee
InsertTable schema version counts are maintained
aavdonkin Sep 13, 2024
1e08bfb
Moved version count maintenance from TDbWrapper to calling functions
aavdonkin Sep 13, 2024
81a5da5
Fixed tests
aavdonkin Sep 13, 2024
890af11
Moved versions counting down from SaveToDatabase and RemoveToDatabase…
aavdonkin Sep 16, 2024
5e35b86
Unused version schema is removed from local database
aavdonkin Sep 16, 2024
59c183c
Unused schema versions are removed from local db and memory
aavdonkin Sep 16, 2024
0591e59
Remove unused schema from local db during commit rather than immediately
aavdonkin Sep 17, 2024
2802c1b
Removing unused versions are performed in TDatabase::Commit
aavdonkin Sep 17, 2024
dfeeebb
Do not remove last schema version from memory and local database
aavdonkin Sep 18, 2024
f1a7ac8
Fixed version deletion
aavdonkin Sep 18, 2024
f2e6baf
Remove deleted last not deleted version from database when new versio…
aavdonkin Sep 23, 2024
4adeaf7
Fixed wrong conflict resolution
aavdonkin Sep 23, 2024
f2dda73
Fixed TColumnEngineForLogs tests build
aavdonkin Sep 23, 2024
7d9b831
Fixed tests after rebase
aavdonkin Sep 24, 2024
47efee4
Added possibility to quickly turn on and off logging
aavdonkin Sep 24, 2024
2aeba59
Replace left LOG_S_CRIT to TEMPLOG
aavdonkin Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ydb/core/tablet_flat/tablet_flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ namespace NTable {

namespace NTabletFlatExecutor {

namespace NFlatExecutorSetup {
struct ITablet;
}

class TTransactionContext;
class TExecutor;
struct TPageCollectionTxEnv;
Expand Down Expand Up @@ -247,6 +251,7 @@ class TTransactionContext : public TTxMemoryProviderBase {
NTable::TDatabase &DB;
NWilson::TSpan &TransactionSpan;
NWilson::TSpan TransactionExecutionSpan;
NFlatExecutorSetup::ITablet* Owner = nullptr;
Copy link
Collaborator

@swalrus1 swalrus1 Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем таблет в контексте транзакции?

Почему нельзя прокинуть CS, не нагружая им TTransactionContext? Но лучше даже не передавать CS, а передавать только счётчики туда, где они используются.


private:
bool Rescheduled_ = false;
Expand Down Expand Up @@ -280,7 +285,6 @@ enum class ETerminationReason {
MemoryLimitExceeded = 1,
};


class ITransaction : TNonCopyable {
public:
using TTransactionContext = NTabletFlatExecutor::TTransactionContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ bool TTxWrite::CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
}

bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
txc.Owner = Self;
TMemoryProfileGuard mpg("TTxWrite::Execute");
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace NKikimr::NColumnShard {

bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) {
txc.Owner = Self;
auto changes = Ev->Get()->IndexChanges;
TMemoryProfileGuard mpg("TTxWriteIndex::Execute::" + changes->TypeString());
TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("external_task_id", changes->GetTaskIdentifier());
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
{
ACFL_DEBUG("step", "TInsertTable::Load_Start");
TMemoryProfileGuard g("TTxInit/InsertTable");
auto localInsertTable = std::make_unique<NOlap::TInsertTable>();
auto localInsertTable = std::make_unique<NOlap::TInsertTable>(Self);
if (!localInsertTable->Load(db, dbTable, TAppData::TimeProvider->Now())) {
ACFL_ERROR("step", "TInsertTable::Load_Fails");
return false;
Expand All @@ -115,7 +115,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)

{
ACFL_DEBUG("step", "TTablesManager::Load_Start");
TTablesManager tManagerLocal(Self->StoragesManager, Self->TabletID());
TTablesManager tManagerLocal(Self->StoragesManager, Self->TabletID(), Self);
{
TMemoryProfileGuard g("TTxInit/TTablesManager");
if (!tManagerLocal.InitFromDB(db)) {
Expand Down Expand Up @@ -239,6 +239,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}

bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) {
txc.Owner = Self;
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
LOG_S_DEBUG("TTxInit.Execute at tablet " << Self->TabletID());

Expand Down Expand Up @@ -340,6 +341,7 @@ class TTxApplyNormalizer : public TTransactionBase<TColumnShard> {
};

bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) {
txc.Owner = Self;
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString());
if (!Changes->ApplyOnExecute(txc, Self->NormalizerController)) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, info->TabletID)
, TablesManager(StoragesManager, info->TabletID, this)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
, InsertTable(std::make_unique<NOlap::TInsertTable>(this))
, InsertTaskSubscription(NOlap::TInsertColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters())
, CompactTaskSubscription(NOlap::TCompactColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters())
, TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters())
Expand Down
120 changes: 120 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "columnshard_private_events.h"
#include "tables_manager.h"

#include "common/log.h"

#include "blobs_action/events/delete_blobs.h"
#include "bg_tasks/events/local.h"
#include "transactions/tx_controller.h"
Expand Down Expand Up @@ -45,6 +47,8 @@
#include <ydb/services/metadata/service.h>
#include <ydb/services/metadata/abstract/common.h>

#include <util/generic/string.h>

namespace NKikimr::NOlap {
class TCleanupPortionsColumnEngineChanges;
class TCleanupTablesColumnEngineChanges;
Expand All @@ -53,6 +57,8 @@ class TChangesWithAppend;
class TCompactColumnEngineChanges;
class TInsertColumnEngineChanges;
class TStoragesManager;
class TDbWrapper;
class TColumnEngineForLogs;

namespace NReader {
class TTxScan;
Expand Down Expand Up @@ -136,6 +142,94 @@ using ITransaction = NTabletFlatExecutor::ITransaction;
template <typename T>
using TTransactionBase = NTabletFlatExecutor::TTransactionBase<T>;

class TInsertKey {
public:
ui64 PlanStep;
ui64 TxId;
ui64 PathId;
TString DedupId;
ui8 RecType;

public:
TInsertKey() = default;

TInsertKey(ui64 planStep, ui64 txId, ui64 pathId, const TString& dedupId, ui8 recType)
: PlanStep(planStep)
, TxId(txId)
, PathId(pathId)
, DedupId(dedupId)
, RecType(recType)
{
}

bool operator==(const TInsertKey& other) const {
return (PlanStep == other.PlanStep) && (TxId == other.TxId) && (PathId == other.PathId) && (DedupId == other.DedupId) && (RecType == other.RecType);
}

ui64 Hash() const {
return CombineHashes(PlanStep, CombineHashes(TxId, CombineHashes(PathId, CombineHashes(hash<TString>()(DedupId), (ui64)RecType))));
}
};

class TVersionCounts {
private:
struct Hasher {
inline size_t operator()(const TInsertKey& key) const noexcept {
return key.Hash();
}
};

private:
THashMap<TInsertKey, ui64, Hasher> InsertVersions;
THashMap<std::pair<ui64, ui64>, ui64> PortionVersions;
THashMap<ui64, ui32> VersionCounts;

public:
template<class Key, class Versions>
void VersionAddRef(Versions& versions, const Key& portion, ui64 version) {
ui64& curVer = versions[portion];
if (curVer != 0) {// Portion is already in the local database, no need to increase ref count
TEMPLOG("Schema version is already written");
AFL_VERIFY(version == curVer);
return;
}
curVer = version;
ui32& refCount = VersionCounts[version];
refCount++;
TEMPLOG("Ref count of schema version " << version << " changed from " << refCount - 1 << " to " << refCount << " this " << (ui64)this);
}

template<class Key, class Versions>
ui32 VersionRemoveRef(Versions& versions, const Key& portion, ui64 version) {
auto iter = versions.find(portion);
if (iter == versions.end()) { //Portion is already removed from local databae, no need to decrease ref count
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А когда VersionRemoveRef может позваться дважды для одной порции? Почему возникает необходимость проверять, что порция не посчитана ранее?

То же самое для VersionAddRef.

return (ui32)-1;
}
versions.erase(iter);
ui32& refCount = VersionCounts[version];
AFL_VERIFY(refCount > 0);
TEMPLOG("Ref count of schema version " << version << " changed from " << refCount << " to " << refCount - 1 << " this " << (ui64)this);
return --refCount;
}

void VersionAddRef(ui64 portion, ui64 pathId, ui64 version) {
VersionAddRef(PortionVersions, std::pair<ui64, ui64>(portion, pathId), version);
}

ui32 VersionRemoveRef(ui64 portion, ui64 pathId, ui64 version) {
return VersionRemoveRef(PortionVersions, std::pair<ui64, ui64>(portion, pathId), version);
}

void VersionAddRef(const TInsertKey& key, ui64 version) {
VersionAddRef(InsertVersions, key, version);
}

ui32 VersionRemoveRef(const TInsertKey& key, ui64 version) {
return VersionRemoveRef(InsertVersions, key, version);
}

};

class TColumnShard
: public TActor<TColumnShard>
, public NTabletFlatExecutor::TTabletExecutedFlat
Expand Down Expand Up @@ -186,6 +280,9 @@ class TColumnShard
friend class NOlap::NReader::TTxInternalScan;
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;

friend class NOlap::TDbWrapper;
friend class NOlap::TColumnEngineForLogs;

class TStoragesManager;
friend class TTxController;

Expand Down Expand Up @@ -620,6 +717,29 @@ class TColumnShard
}

TColumnShard(TTabletStorageInfo* info, const TActorId& tablet);

void VersionAddRef(ui64 portion, ui64 pathId, ui64 version) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Счётчики стоит инкапсулировать в класс, чтобы не передавать CS везде, где нужен доступ к ним.

Как будто можно убрать эти методы и передавать VersionCounts везде.

VersionCounts.VersionAddRef(portion, pathId, version);
}

ui32 VersionRemoveRef(ui64 portion, ui64 pathId, ui64 version) {
return VersionCounts.VersionRemoveRef(portion, pathId, version);
}

void VersionAddRef(ui64 planStep, ui64 txId, ui64 pathId, const TString& dedupId, ui8 recType, ui64 version) {
VersionCounts.VersionAddRef(TInsertKey(planStep, txId, pathId, dedupId, recType), version);
}

ui32 VersionRemoveRef(ui64 planStep, ui64 txId, ui64 pathId, const TString& dedupId, ui8 recType, ui64 version) {
return VersionCounts.VersionRemoveRef(TInsertKey(planStep, txId, pathId, dedupId, recType), version);
}

void RemoveUnusedSchemaVersion(NTable::TDatabase* database, ui64 version) {
TablesManager.RemoveUnusedSchemaVersion(database, version);
}

private:
TVersionCounts VersionCounts;
};

}
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/columnshard_schema.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
#include "columnshard_schema.h"
#include "transactions/tx_controller.h"
#include "columnshard_impl.h"

#include "common/log.h"

namespace NKikimr::NColumnShard {

ui64 GetPlanStep(const NOlap::TInsertTableRecordLoadContext& constructor) {
return constructor.GetRecType() == Schema::EInsertTableIds::Committed ? constructor.GetPlanStep() : 0;
}

bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) {
auto rowset = db.Table<InsertTable>().Select();
if (!rowset.IsReady()) {
Expand All @@ -12,7 +19,10 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
while (!rowset.EndOfSet()) {
NOlap::TInsertTableRecordLoadContext constructor;
constructor.ParseFromDatabase(rowset);
ui64 planStep = GetPlanStep(constructor);

TEMPLOG("Loaded schema version " << constructor.GetSchemaVersion() << " planstep " << planStep << " txid " << constructor.GetWriteTxId() << " dedup id " << constructor.GetDedupId() << " rec type " << (ui32)constructor.GetRecType());
insertTable.CS->VersionAddRef(planStep, constructor.GetWriteTxId(), constructor.GetPathId(), constructor.GetDedupId(), (ui8)constructor.GetRecType(), constructor.GetSchemaVersion());
switch (constructor.GetRecType()) {
case Schema::EInsertTableIds::Inserted:
insertTable.AddInserted(constructor.BuildInsertedOrAborted(dsGroupSelector), true);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,11 @@ class TInsertTableRecordLoadContext {
private:
NColumnShard::Schema::EInsertTableIds RecType;
ui64 PlanStep;
ui64 WriteTxId;
TInsertWriteId InsertWriteId;
ui64 PathId;
YDB_ACCESSOR_DEF(ui64, WriteTxId);
YDB_ACCESSOR_DEF(ui64, PathId);
YDB_ACCESSOR_DEF(TString, DedupId);
ui64 SchemaVersion;
YDB_ACCESSOR_DEF(ui64, SchemaVersion);
TString BlobIdString;
std::optional<NOlap::TUnifiedBlobId> BlobId;
TString MetadataString;
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/common/log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#pragma once

#if 0
#define TEMPLOG(x) LOG_S_CRIT(x)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Для выборочной записи логов можно использовать AFL_DEBUG|TRACE|...

Как будто они решают ту же задачу -- зачем ещё один механизм?

#else
#define TEMPLOG(x)
#endif
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include "tx_data_from_source.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/common/log.h>

namespace NKikimr::NOlap::NDataSharing {

bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
TEMPLOG("SaveToDatabase from TTxDataFromSource");
using namespace NKikimr::NColumnShard;
TDbWrapper dbWrapper(txc.DB, nullptr);
auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>();
Expand All @@ -22,6 +24,9 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc,
for (auto&& i : PortionsByPathId) {
for (auto&& p : i.second.GetPortions()) {
p.SaveToDatabase(dbWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
txc.DB.OnCommit([self = Self, portion = p.GetPortion(), pathId = p.GetPathId(), schema = p.GetSchemaVersionVerified()]() {
self->VersionAddRef(portion, pathId, schema);
});
}
}
NIceDb::TNiceDb db(txc.DB);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "cleanup_portions.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/common/log.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
Expand All @@ -25,6 +26,14 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC
p.RemoveFromDatabase(context.DBWrapper);
p.FillBlobIdsByStorage(blobIdsByStorage, context.EngineLogs.GetVersionedIndex());
pathIds.emplace(p.GetPathId());
context.DB->OnCommit([self, portion = p.GetPortion(), pathId = p.GetPathId(), schema = p.GetSchemaVersionVerified(), db = context.DB]() {
TEMPLOG("Removing portion from cleanup");
ui32 refCount = self->VersionRemoveRef(portion, pathId, schema);
if (refCount == 0) {
TEMPLOG("Ref count is set to 0 for version " << schema << " need to delete");
self->TablesManager.RemoveUnusedSchemaVersion(db, schema);
}
});
}
for (auto&& i : blobIdsByStorage) {
auto action = BlobsAction.GetRemoving(i.first);
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
Y_ABORT_UNLESS(portionInfo.HasRemoveSnapshot());
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
if (self != nullptr) { // nullptr can happen in tests
context.DB->OnCommit([self, portion = portionInfo.GetPortion(), pathId = portionInfo.GetPathId(), schema = portionInfo.GetSchemaVersionVerified()]() {
self->VersionAddRef(portion, pathId, schema);
});
}
}
const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobsResult& item) {
auto& portionInfo = item.GetPortionResult();
Expand All @@ -31,6 +36,11 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
auto& portionInfo = portionInfoWithBlobs.GetPortionResult();
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
if (self != nullptr) { // nullptr can happen in tests
context.DB->OnCommit([self, portion = portionInfo.GetPortion(), pathId = portionInfo.GetPathId(), schema = portionInfo.GetSchemaVersionVerified()]() {
self->VersionAddRef(portion, pathId, schema);
});
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ class IColumnEngine {
return TSnapshot::Zero();
}
virtual void OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) = 0;

virtual bool RemoveSchemaVersion(ui64) {
return true;
}
};

} // namespace NKikimr::NOlap
Loading
Loading