Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix double release repl token + SendOnlyHugeBlobs setting + move settings from consts to config #9569

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ namespace NKikimr::NStorage {
}
}

vdiskConfig->BalancingEnableSend = Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetEnableSend();
vdiskConfig->BalancingEnableDelete = Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetEnableDelete();
vdiskConfig->BalancingBalanceOnlyHugeBlobs = Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetBalanceOnlyHugeBlobs();
vdiskConfig->BalancingJobGranularity = TDuration::MicroSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetJobGranularityUs());
vdiskConfig->BalancingBatchSize = Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetBatchSize();
vdiskConfig->BalancingMaxToSendPerEpoch = Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetMaxToSendPerEpoch();
vdiskConfig->BalancingMaxToDeletePerEpoch = Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetMaxToDeletePerEpoch();
vdiskConfig->BalancingReadBatchTimeout = TDuration::MilliSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetReadBatchTimeoutMs());
vdiskConfig->BalancingSendBatchTimeout = TDuration::MilliSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetSendBatchTimeoutMs());
vdiskConfig->BalancingRequestBlobsOnMainTimeout = TDuration::MilliSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetRequestBlobsOnMainTimeoutMs());
vdiskConfig->BalancingDeleteBatchTimeout = TDuration::MilliSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetDeleteBatchTimeoutMs());
vdiskConfig->BalancingEpochTimeout = TDuration::MilliSeconds(Cfg->BlobStorageConfig.GetVDiskBalancingConfig().GetEpochTimeoutMs());

// issue initial report to whiteboard before creating actor to avoid races
Send(WhiteboardId, new NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate(vdiskId, groupInfo->GetStoragePoolName(),
vslotId.PDiskId, vslotId.VDiskSlotId, pdiskGuid, kind, donorMode, whiteboardInstanceGuid, std::move(donors)));
Expand Down
13 changes: 6 additions & 7 deletions ydb/core/blobstorage/ut_blobstorage/balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ struct TTestEnv {
.NodeCount = nodeCount,
.VDiskReplPausedAtStart = false,
.Erasure = erasure,
.FeatureFlags = MakeFeatureFlags(),
.ConfigPreprocessor = [](ui32, TNodeWardenConfig& conf) {
auto* balancingConf = conf.BlobStorageConfig.MutableVDiskBalancingConfig();
balancingConf->SetEnableSend(true);
balancingConf->SetEnableDelete(true);
balancingConf->SetBalanceOnlyHugeBlobs(false);
},
})
{
Env.CreateBoxAndPool(1, 1);
Expand All @@ -46,12 +51,6 @@ struct TTestEnv {
}
}

static TFeatureFlags MakeFeatureFlags() {
TFeatureFlags res;
res.SetUseVDisksBalancing(true);
return res;
}

static TString PrepareData(const ui32 dataLen, const ui32 start) {
TString data(Reserve(dataLen));
for (ui32 i = 0; i < dataLen; ++i) {
Expand Down
80 changes: 45 additions & 35 deletions ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ namespace NBalancing {
///////////////////////////////////////////////////////////////////////////////////////////

void ContinueBalancing() {
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();

if (SendOnMainParts.Empty() && TryDeleteParts.Empty()) {
// no more parts to send or delete
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed"));
Expand All @@ -101,8 +104,6 @@ namespace NBalancing {

void ScheduleJobQuant() {
Ctx->MonGroup.ReplTokenAquired()++;
Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();

// once repl token received, start balancing - waking up sender and deleter
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB02, VDISKP(Ctx->VCtx, "Schedule job quant"),
Expand All @@ -125,57 +126,65 @@ namespace NBalancing {
return;
}

const auto& top = GInfo->GetTopology();
TPartsCollectorMerger merger(top.GType);
THPTimer timer;

for (ui32 cnt = 0; It.Valid(); It.Next(), ++cnt) {
if (cnt % 100 == 99 && TDuration::Seconds(timer.Passed()) > JOB_GRANULARITY) {
if (cnt % 128 == 127 && TDuration::Seconds(timer.Passed()) > Ctx->Cfg.JobGranularity) {
// actor should not block the thread for a long time, so we should yield
// STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed()));
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Collect keys"), (collected, cnt), (passed, timer.Passed()));
Send(SelfId(), new NActors::TEvents::TEvWakeup());
return;
}

const auto& top = GInfo->GetTopology();
const auto& key = It.GetCurKey().LogoBlobID();

TPartsCollectorMerger merger(top.GType);
if (Ctx->Cfg.BalanceOnlyHugeBlobs && !Ctx->HugeBlobCtx->IsHugeBlob(GInfo->Type, key, Ctx->MinREALHugeBlobInBytes)) {
// skip non huge blobs
continue;
}

merger.Clear();
It.PutToMerger(&merger);

auto [moveMask, delMask] = merger.Ingress.HandoffParts(&top, Ctx->VCtx->ShortSelfVDisk, key);

if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty() && SendOnMainParts.Size() < MAX_TO_SEND_PER_EPOCH) {
// collect parts to send on main
for (const auto& [parts, data]: merger.Parts) {
if (!(partsToSend & parts).Empty()) {
SendOnMainParts.Data.emplace_back(TPartInfo{
.Key=It.GetCurKey().LogoBlobID(),
.PartsMask=parts,
.PartData=data
});
// collect parts to send on main
if (Ctx->Cfg.EnableSend && SendOnMainParts.Size() < Ctx->Cfg.MaxToSendPerEpoch) {
if (auto partsToSend = merger.Ingress.LocalParts(top.GType) & moveMask; !partsToSend.Empty()) {
for (const auto& [parts, data]: merger.Parts) {
if (!(partsToSend & parts).Empty()) {
SendOnMainParts.Data.emplace_back(TPartInfo{
.Key=It.GetCurKey().LogoBlobID(),
.PartsMask=parts,
.PartData=data
});
}
}
}
}

if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty() && TryDeleteParts.Size() < MAX_TO_DELETE_PER_EPOCH) {
// collect parts to delete
auto key = It.GetCurKey().LogoBlobID();
for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) {
TryDeleteParts.Data.emplace_back(TLogoBlobID(key, partIdx + 1));
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.Data.back().ToString()));
}
// collect parts to delete
if (Ctx->Cfg.EnableDelete && TryDeleteParts.Size() < Ctx->Cfg.MaxToDeletePerEpoch) {
if (auto partsToDelete = merger.Ingress.LocalParts(top.GType) & delMask; !partsToDelete.Empty()) {
auto key = It.GetCurKey().LogoBlobID();
for (ui8 partIdx = partsToDelete.FirstPosition(); partIdx < partsToDelete.GetSize(); partIdx = partsToDelete.NextPosition(partIdx)) {
TryDeleteParts.Data.emplace_back(TLogoBlobID(key, partIdx + 1));
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB10, VDISKP(Ctx->VCtx, "Delete"), (LogoBlobId, TryDeleteParts.Data.back().ToString()));
}

for (const auto& [parts, data]: merger.Parts) {
if (!(partsToDelete & parts).Empty()) {
TryDeletePartsFullData[key].emplace_back(TPartInfo{
.Key=key, .PartsMask=parts, .PartData=data
});
for (const auto& [parts, data]: merger.Parts) {
if (!(partsToDelete & parts).Empty()) {
TryDeletePartsFullData[key].emplace_back(TPartInfo{
.Key=key, .PartsMask=parts, .PartData=data
});
}
}
}
}

merger.Clear();

if (SendOnMainParts.Size() >= MAX_TO_SEND_PER_EPOCH && TryDeleteParts.Size() >= MAX_TO_DELETE_PER_EPOCH) {
if (SendOnMainParts.Size() >= Ctx->Cfg.MaxToSendPerEpoch && TryDeleteParts.Size() >= Ctx->Cfg.MaxToDeletePerEpoch) {
// reached the limit of parts to send and delete
break;
}
Expand All @@ -192,11 +201,12 @@ namespace NBalancing {
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "TEvCompleted"), (Type, ev->Type));
BatchManager.Handle(ev);

if (StartTime + EPOCH_TIMEOUT < TlsActivationContext->Now()) {
if (StartTime + Ctx->Cfg.EpochTimeout < TlsActivationContext->Now()) {
Ctx->MonGroup.EpochTimeouts()++;
Send(MakeBlobStorageReplBrokerID(), new TEvReleaseReplToken);
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB04, VDISKP(Ctx->VCtx, "Epoch timeout"));
PassAway();
return;
}

if (BatchManager.IsBatchCompleted()) {
Expand Down Expand Up @@ -242,8 +252,8 @@ namespace NBalancing {

void Handle(NActors::TEvents::TEvUndelivered::TPtr ev) {
if (ev.Get()->Type == TEvReplToken::EventType) {
STLOG(PRI_WARN, BS_VDISK_BALANCING, BSVB06, VDISKP(Ctx->VCtx, "Ask repl token msg not delivered"), (SelfId, SelfId()), (PDiskId, Ctx->VDiskCfg->BaseInfo.PDiskId));
ScheduleJobQuant();
STLOG(PRI_ERROR, BS_VDISK_BALANCING, BSVB06, VDISKP(Ctx->VCtx, "Ask repl token msg not delivered"), (SelfId, SelfId()), (PDiskId, Ctx->VDiskCfg->BaseInfo.PDiskId));
ContinueBalancing();
}
}

Expand Down Expand Up @@ -302,8 +312,8 @@ namespace NBalancing {
, Ctx(ctx)
, GInfo(ctx->GInfo)
, It(Ctx->Snap.HullCtx, &Ctx->Snap.LogoBlobsSnap)
, SendOnMainParts(BATCH_SIZE)
, TryDeleteParts(BATCH_SIZE)
, SendOnMainParts(Ctx->Cfg.BatchSize)
, TryDeleteParts(Ctx->Cfg.BatchSize)
, StartTime(TlsActivationContext->Now())
{
}
Expand Down
41 changes: 23 additions & 18 deletions ydb/core/blobstorage/vdisk/balance/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,27 @@


namespace NKikimr {

struct TBalancingCfg {
bool EnableSend;
bool EnableDelete;

bool BalanceOnlyHugeBlobs;
TDuration JobGranularity;

ui64 BatchSize;
ui64 MaxToSendPerEpoch;
ui64 MaxToDeletePerEpoch;

TDuration ReadBatchTimeout;
TDuration SendBatchTimeout;
TDuration RequestBlobsOnMainTimeout;
TDuration DeleteBatchTimeout;
TDuration EpochTimeout;
};

struct TBalancingCtx {
const TBalancingCfg Cfg;
TIntrusivePtr<TVDiskContext> VCtx;
TPDiskCtxPtr PDiskCtx;
THugeBlobCtxPtr HugeBlobCtx;
Expand All @@ -23,6 +43,7 @@ namespace NKikimr {
ui32 MinREALHugeBlobInBytes;

TBalancingCtx(
const TBalancingCfg& cfg,
TIntrusivePtr<TVDiskContext> vCtx,
TPDiskCtxPtr pDiskCtx,
THugeBlobCtxPtr hugeBlobCtx,
Expand All @@ -32,7 +53,8 @@ namespace NKikimr {
TIntrusivePtr<TBlobStorageGroupInfo> gInfo,
ui32 minREALHugeBlobInBytes
)
: VCtx(std::move(vCtx))
: Cfg(cfg)
, VCtx(std::move(vCtx))
, PDiskCtx(std::move(pDiskCtx))
, HugeBlobCtx(std::move(hugeBlobCtx))
, SkeletonId(skeletonId)
Expand All @@ -55,28 +77,11 @@ namespace NBalancing {
std::variant<TDiskPart, TRope> PartData;
};

static constexpr ui32 SENDER_ID = 0;
static constexpr ui32 DELETER_ID = 1;

static constexpr TDuration JOB_GRANULARITY = TDuration::MilliSeconds(1);

static constexpr TDuration READ_BATCH_TIMEOUT = TDuration::Seconds(10);
static constexpr TDuration SEND_BATCH_TIMEOUT = TDuration::Seconds(10);
static constexpr TDuration REQUEST_BLOBS_ON_MAIN_BATCH_TIMEOUT = TDuration::Seconds(10);
static constexpr TDuration DELETE_BATCH_TIMEOUT = TDuration::Seconds(10);

static constexpr ui64 READ_TIMEOUT_TAG = 0;
static constexpr ui64 SEND_TIMEOUT_TAG = 1;
static constexpr ui64 REQUEST_TIMEOUT_TAG = 2;
static constexpr ui64 DELETE_TIMEOUT_TAG = 3;

static constexpr ui32 BATCH_SIZE = 32;

static constexpr ui32 MAX_TO_SEND_PER_EPOCH = 1000;
static constexpr ui32 MAX_TO_DELETE_PER_EPOCH = 1000;
static constexpr TDuration EPOCH_TIMEOUT = TDuration::Minutes(1);


struct TEvBalancingSendPartsOnMain : TEventLocal<TEvBalancingSendPartsOnMain, TEvBlobStorage::EvBalancingSendPartsOnMain> {
TEvBalancingSendPartsOnMain(const TVector<TLogoBlobID>& ids)
: Ids(ids)
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/blobstorage/vdisk/balance/deleter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ namespace {

PartsRequester.SendRequestsToCheckPartsOnMain(SelfId());

Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(REQUEST_TIMEOUT_TAG)); // read timeout
Schedule(Ctx->Cfg.RequestBlobsOnMainTimeout, new NActors::TEvents::TEvWakeup(REQUEST_TIMEOUT_TAG)); // read timeout
}

void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev) {
Expand Down Expand Up @@ -248,7 +248,7 @@ namespace {

PartsDeleter.DeleteParts(SelfId(), PartsRequester.GetResult());

Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(DELETE_TIMEOUT_TAG)); // delete timeout
Schedule(Ctx->Cfg.DeleteBatchTimeout, new NActors::TEvents::TEvWakeup(DELETE_TIMEOUT_TAG)); // delete timeout
}

void HandleDelLogoBlobResult(TEvDelLogoBlobDataSyncLogResult::TPtr ev) {
Expand All @@ -269,7 +269,7 @@ namespace {
}

void PassAway() override {
Send(NotifyId, new NActors::TEvents::TEvCompleted(DELETER_ID));
Send(NotifyId, new NActors::TEvents::TEvCompleted());
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB32, VDISKP(Ctx->VCtx, "TDeleter::PassAway"));
TActorBootstrapped::PassAway();
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/blobstorage/vdisk/balance/sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ namespace {
return;
}

Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(READ_TIMEOUT_TAG)); // read timeout
Schedule(Ctx->Cfg.ReadBatchTimeout, new NActors::TEvents::TEvWakeup(READ_TIMEOUT_TAG)); // read timeout
}

void Handle(NPDisk::TEvChunkReadResult::TPtr ev) {
Expand Down Expand Up @@ -293,7 +293,7 @@ namespace {

Sender.SendPartsOnMain(SelfId(), Reader.GetResult());

Schedule(TDuration::Seconds(15), new NActors::TEvents::TEvWakeup(SEND_TIMEOUT_TAG)); // send timeout
Schedule(Ctx->Cfg.SendBatchTimeout, new NActors::TEvents::TEvWakeup(SEND_TIMEOUT_TAG)); // send timeout
}

template<class TEvPutResult>
Expand All @@ -314,7 +314,7 @@ namespace {
}

void PassAway() override {
Send(NotifyId, new NActors::TEvents::TEvCompleted(SENDER_ID));
Send(NotifyId, new NActors::TEvents::TEvCompleted());
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB28, VDISKP(Ctx->VCtx, "TSender::PassAway"));
TActorBootstrapped::PassAway();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/balance/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ namespace NBalancing {
}

void TPartsCollectorMerger::Clear() {
Ingress = TIngress();
Parts.clear();
Parts.resize(GType.TotalPartCount());
}

} // NBalancing
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,20 @@ namespace NKikimr {
TControlWrapper DefaultHugeGarbagePerMille;
bool UseActorSystemTimeInBSQueue = false;

///////////// BALANCING SETTINGS ////////////////////
bool BalancingEnableSend;
bool BalancingEnableDelete;
TDuration BalancingJobGranularity;
bool BalancingBalanceOnlyHugeBlobs;
ui64 BalancingBatchSize;
ui64 BalancingMaxToSendPerEpoch;
ui64 BalancingMaxToDeletePerEpoch;
TDuration BalancingReadBatchTimeout;
TDuration BalancingSendBatchTimeout;
TDuration BalancingRequestBlobsOnMainTimeout;
TDuration BalancingDeleteBatchTimeout;
TDuration BalancingEpochTimeout;

///////////// COST METRICS SETTINGS ////////////////
bool UseCostTracker = true;
TCostMetricsParametersByMedia CostMetricsParametersByMedia;
Expand Down
21 changes: 18 additions & 3 deletions ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1930,7 +1930,7 @@ namespace NKikimr {

// create Hull
Hull = std::make_shared<THull>(Db->LsnMngr, PDiskCtx, Db->SkeletonID,
Config->FeatureFlags.GetUseVDisksBalancing(), std::move(*ev->Get()->Uncond),
Config->BalancingEnableDelete, std::move(*ev->Get()->Uncond),
ctx.ExecutorThread.ActorSystem, Config->BarrierValidation);
ActiveActors.Insert(Hull->RunHullServices(Config, HullLogCtx, Db->SyncLogFirstLsnToKeep,
Db->LoggerID, Db->LogCutterID, ctx), ctx, NKikimrServices::BLOBSTORAGE);
Expand Down Expand Up @@ -2556,15 +2556,30 @@ namespace NKikimr {
// don't run balancing for the static group
return;
}
if (!Config->FeatureFlags.GetUseVDisksBalancing() || VCtx->Top->GType.GetErasure() == TErasureType::ErasureMirror3of4) {
bool balancingEnabled = Config->BalancingEnableSend || Config->BalancingEnableDelete;
if (!balancingEnabled || VCtx->Top->GType.GetErasure() == TErasureType::ErasureMirror3of4) {
return;
}
if (BalancingId) {
Send(BalancingId, new NActors::TEvents::TEvPoison());
ActiveActors.Erase(BalancingId);
}
TBalancingCfg balancingCfg{
.EnableSend=Config->BalancingEnableSend,
.EnableDelete=Config->BalancingEnableDelete,
.BalanceOnlyHugeBlobs=Config->BalancingBalanceOnlyHugeBlobs,
.JobGranularity=Config->BalancingJobGranularity,
.BatchSize=Config->BalancingBatchSize,
.MaxToSendPerEpoch=Config->BalancingMaxToSendPerEpoch,
.MaxToDeletePerEpoch=Config->BalancingMaxToDeletePerEpoch,
.ReadBatchTimeout=Config->BalancingReadBatchTimeout,
.SendBatchTimeout=Config->BalancingSendBatchTimeout,
.RequestBlobsOnMainTimeout=Config->BalancingRequestBlobsOnMainTimeout,
.DeleteBatchTimeout=Config->BalancingDeleteBatchTimeout,
.EpochTimeout=Config->BalancingEpochTimeout,
};
auto balancingCtx = std::make_shared<TBalancingCtx>(
VCtx, PDiskCtx, HugeBlobCtx, SelfId(), Hull->GetSnapshot(), Config, GInfo, MinREALHugeBlobInBytes);
balancingCfg, VCtx, PDiskCtx, HugeBlobCtx, SelfId(), Hull->GetSnapshot(), Config, GInfo, MinREALHugeBlobInBytes);
BalancingId = ctx.Register(CreateBalancingActor(balancingCtx));
ActiveActors.Insert(BalancingId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
}
Expand Down
Loading
Loading