Skip to content

Commit

Permalink
Fix generating too much data in memory for ydb workload import command (
Browse files Browse the repository at this point in the history
  • Loading branch information
iddqdex authored Sep 21, 2024
1 parent 34e573c commit eec992f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 24 deletions.
41 changes: 18 additions & 23 deletions ydb/public/lib/ydb_cli/commands/ydb_workload_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ TWorkloadCommandImport::TUploadCommand::TUploadCommand(NYdbWorkload::TWorkloadPa
int TWorkloadCommandImport::TUploadCommand::DoRun(NYdbWorkload::IWorkloadQueryGenerator& /*workloadGen*/, TConfig& /*config*/) {
auto dataGeneratorList = Initializer->GetBulkInitialData();
AtomicSet(ErrorsCount, 0);
InFlightSemaphore = NThreading::TAsyncSemaphore::Make(UploadParams.MaxInFlight);
InFlightSemaphore = MakeHolder<TFastSemaphore>(UploadParams.MaxInFlight);
for (auto dataGen : dataGeneratorList) {
TThreadPoolParams params;
params.SetCatching(false);
Expand Down Expand Up @@ -97,36 +97,31 @@ TAsyncStatus TWorkloadCommandImport::TUploadCommand::SendDataPortion(NYdbWorkloa
}

void TWorkloadCommandImport::TUploadCommand::ProcessDataGenerator(std::shared_ptr<NYdbWorkload::IBulkDataGenerator> dataGen) noexcept try {
TDeque<NThreading::TFuture<void>> sendings;
TAtomic counter = 0;
for (auto portions = dataGen->GenerateDataPortion(); !portions.empty() && !AtomicGet(ErrorsCount); portions = dataGen->GenerateDataPortion()) {
for (const auto& data: portions) {
sendings.emplace_back(
InFlightSemaphore->AcquireAsync().Apply([this, data](const auto& sem) {
auto ar = MakeAtomicShared<NThreading::TAsyncSemaphore::TAutoRelease>(sem.GetValueSync()->MakeAutoRelease());
return SendDataPortion(data).Apply(
[ar, data, this](const TAsyncStatus& result) {
const auto& res = result.GetValueSync();
data->SetSendResult(res);
auto guard = Guard(Lock);
if (!res.IsSuccess()) {
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
AtomicIncrement(ErrorsCount);
} else {
Bar->AddProgress(data->GetSize());
}
});
AtomicIncrement(counter);
SendDataPortion(data).Apply(
[data, this, &counter, g = MakeAtomicShared<TGuard<TFastSemaphore>>(*InFlightSemaphore)](const TAsyncStatus& result) {
const auto& res = result.GetValueSync();
data->SetSendResult(res);
auto guard = Guard(Lock);
if (!res.IsSuccess()) {
Cerr << "Bulk upset to " << data->GetTable() << " failed, " << res.GetStatus() << ", " << res.GetIssues().ToString() << Endl;
AtomicIncrement(ErrorsCount);
} else {
Bar->AddProgress(data->GetSize());
}
)
);
while(sendings.size() > UploadParams.MaxInFlight) {
sendings.pop_front();
}
AtomicDecrement(counter);
});
}
if (AtomicGet(ErrorsCount)) {
break;
}
}
NThreading::WaitAll(sendings).GetValueSync();
while(AtomicGet(counter) > 0) {
Sleep(TDuration::MilliSeconds(100));
}
} catch (...) {
auto g = Guard(Lock);
Cerr << "Fill table " << dataGen->GetName() << " failed: " << CurrentExceptionMessage() << ", backtrace: ";
Expand Down
2 changes: 1 addition & 1 deletion ydb/public/lib/ydb_cli/commands/ydb_workload_import.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TWorkloadCommandImport::TUploadCommand final: public TWorkloadCommandBase
NYdbWorkload::TWorkloadDataInitializer::TPtr Initializer;
THolder<TProgressBar> Bar;
TAdaptiveLock Lock;
NThreading::TAsyncSemaphore::TPtr InFlightSemaphore;
THolder<TFastSemaphore> InFlightSemaphore;
TAtomic ErrorsCount;
};

Expand Down

0 comments on commit eec992f

Please sign in to comment.