Skip to content

Commit

Permalink
Added pulling
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jul 16, 2024
1 parent 2a97b98 commit dcdd81a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
}
}

// Coomon helpers
TTestActorRuntime* GetRuntime() const override {
return Server_->GetRuntime();
}
Expand Down Expand Up @@ -491,19 +492,6 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
return event;
}

static void WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback) {
TInstant start = TInstant::Now();
while (TInstant::Now() - start <= timeout) {
TString errorString;
if (callback(errorString)) {
return;
}
Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Waiting " << description << " timeout");
}

NMonitoring::TDynamicCounterPtr GetWorkloadManagerCounters(ui32 nodeIndex) const {
return GetServiceCounters(GetRuntime()->GetAppData(nodeIndex).Counters, "kqp")
->GetSubgroup("subsystem", "workload_manager");
Expand Down Expand Up @@ -598,6 +586,21 @@ TIntrusivePtr<IYdbSetup> TYdbSetupSettings::Create() const {
return MakeIntrusive<TWorkloadServiceYdbSetup>(*this);
}

//// IYdbSetup

void IYdbSetup::WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback) {
TInstant start = TInstant::Now();
while (TInstant::Now() - start <= timeout) {
TString errorString;
if (callback(errorString)) {
return;
}
Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Waiting " << description << " timeout");
}

//// TSampleQueriess

void TSampleQueries::CompareYson(const TString& expected, const TString& actual) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ class IYdbSetup : public TThrRefBase {
virtual void StopWorkloadService(ui64 nodeIndex = 0) const = 0;
virtual void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const = 0;

// Coomon helpers
virtual TTestActorRuntime* GetRuntime() const = 0;
virtual const TYdbSetupSettings& GetSettings() const = 0;
static void WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback);
};

// Test queries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,12 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceTables) {
ydb->WaitPoolHandlersCount(0);

// Check that lease expired
const TDuration leaseDuration = TDuration::Seconds(30); // Same as pool_handlers_acors.cpp:LEASE_DURATION
Sleep(leaseDuration + TDuration::Seconds(5)); // 5s for last pool refresh request
CheckPoolDescription(ydb, 0, 0);
IYdbSetup::WaitFor(TDuration::Seconds(60), "lease expiration", [ydb](TString& errorString) {
auto description = ydb->GetPoolDescription(TDuration::Zero());

errorString = TStringBuilder() << "delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests;
return description.AmountRequests() == 0;
});
}

Y_UNIT_TEST(TestLeaseUpdates) {
Expand Down
23 changes: 10 additions & 13 deletions ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,19 +444,16 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
DROP RESOURCE POOL )" << poolId << ";"
);

TInstant start = TInstant::Now();
while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) {
if (ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown) {
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found");
return;
}

Cerr << "WaitPoolDrop " << TInstant::Now() - start << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Pool drop waiting timeout");
IYdbSetup::WaitFor(FUTURE_WAIT_TIMEOUT, "pool drop", [ydb, poolId](TString& errorString) {
auto kind = ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind;

errorString = TStringBuilder() << "kind = " << kind;
return kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown;
});

auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found");
}

Y_UNIT_TEST(TestResourcePoolAcl) {
Expand Down

0 comments on commit dcdd81a

Please sign in to comment.