Skip to content

Commit

Permalink
Refactor lazy table creation for WM service (#4900)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored May 29, 2024
1 parent d36c6ff commit 8e7ec08
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 174 deletions.
10 changes: 10 additions & 0 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,14 @@ struct TEvDescribeSecretsResponse : public NActors::TEventLocal<TEvDescribeSecre
TDescription Description;
};

struct TEvScriptExecutionsTablesCreationFinished : public NActors::TEventLocal<TEvScriptExecutionsTablesCreationFinished, TKqpScriptExecutionEvents::EvScriptExecutionsTableCreationFinished> {
TEvScriptExecutionsTablesCreationFinished(bool success, NYql::TIssues issues)
: Success(success)
, Issues(std::move(issues))
{}

const bool Success;
const NYql::TIssues Issues;
};

} // namespace NKikimr::NKqp
1 change: 1 addition & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ struct TKqpScriptExecutionEvents {
EvGetScriptExecutionOperationQueryResponse,
EvDescribeSecretsResponse,
EvSaveScriptResultPartFinished,
EvScriptExecutionsTableCreationFinished,
};
};

Expand Down
41 changes: 30 additions & 11 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
EvOnRequestTimeout,
EvCloseIdleSessions,
EvResourcesSnapshot,
EvScriptExecutionsTableCreationFinished,
};

struct TEvReadyToPublishResources : public TEventLocal<TEvReadyToPublishResources, EEv::EvReadyToPublishResources> {};
Expand Down Expand Up @@ -169,10 +168,6 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
: Snapshot(std::move(snapshot)) {}
};

struct TEvScriptExecutionsTablesCreationFinished : public NActors::TEventLocal<TEvScriptExecutionsTablesCreationFinished, EvScriptExecutionsTableCreationFinished> {
TEvScriptExecutionsTablesCreationFinished() = default;
};
};

public:
Expand Down Expand Up @@ -1320,7 +1315,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent);
hFunc(TEvPrivate::TEvCloseIdleSessions, Handle);
hFunc(TEvPrivate::TEvScriptExecutionsTablesCreationFinished, Handle);
hFunc(TEvScriptExecutionsTablesCreationFinished, Handle);
hFunc(NKqp::TEvForgetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvGetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvListScriptExecutionOperations, Handle);
Expand Down Expand Up @@ -1582,11 +1577,16 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
switch (ScriptExecutionsCreationStatus) {
case EScriptExecutionsCreationStatus::NotStarted:
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Pending;
Register(CreateScriptExecutionsTablesCreator(MakeHolder<TEvPrivate::TEvScriptExecutionsTablesCreationFinished>()), TMailboxType::HTSwap, AppData()->SystemPoolId);
Register(CreateScriptExecutionsTablesCreator(), TMailboxType::HTSwap, AppData()->SystemPoolId);
[[fallthrough]];
case EScriptExecutionsCreationStatus::Pending:
if (DelayedEventsQueue.size() < 10000) {
DelayedEventsQueue.emplace_back(std::move(ev));
DelayedEventsQueue.push_back({
.Event = std::move(ev),
.ResponseBuilder = [](Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
return new TResponse(status, std::move(issues));
}
});
} else {
NYql::TIssues issues;
issues.AddIssue("Too many queued requests");
Expand All @@ -1598,10 +1598,25 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
}
}

void Handle(TEvPrivate::TEvScriptExecutionsTablesCreationFinished::TPtr&) {
void Handle(TEvScriptExecutionsTablesCreationFinished::TPtr& ev) {
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished;

NYql::TIssue rootIssue;
if (!ev->Get()->Success) {
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted;
rootIssue.SetMessage("Failed to create script execution tables");
for (const NYql::TIssue& issue : ev->Get()->Issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}
}

while (!DelayedEventsQueue.empty()) {
Send(std::move(DelayedEventsQueue.front()));
auto delayedEvent = std::move(DelayedEventsQueue.front());
if (ev->Get()->Success) {
Send(std::move(delayedEvent.Event));
} else {
Send(delayedEvent.Event->Sender, delayedEvent.ResponseBuilder(Ydb::StatusIds::INTERNAL_ERROR, {rootIssue}));
}
DelayedEventsQueue.pop_front();
}
}
Expand Down Expand Up @@ -1765,8 +1780,12 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
Pending,
Finished,
};
struct TDelayedEvent {
THolder<IEventHandle> Event;
std::function<IEventBase*(Ydb::StatusIds::StatusCode, NYql::TIssues)> ResponseBuilder;
};
EScriptExecutionsCreationStatus ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::NotStarted;
std::deque<THolder<IEventHandle>> DelayedEventsQueue;
std::deque<TDelayedEvent> DelayedEventsQueue;
bool IsLookupByRmScheduled = false;
TActorId KqpTempTablesAgentActor;
};
Expand Down
198 changes: 75 additions & 123 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,140 +79,92 @@ class TQueryBase : public NKikimr::TQueryBase {
};


class TScriptExecutionsTablesCreator : public TActorBootstrapped<TScriptExecutionsTablesCreator> {
public:
explicit TScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent)
: ResultEvent(std::move(resultEvent))
{
}

void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override {
NActors::TActorBootstrapped<TScriptExecutionsTablesCreator>::Registered(sys, owner);
Owner = owner;
}
class TScriptExecutionsTablesCreator : public NTableCreator::TMultiTableCreator {
using TBase = NTableCreator::TMultiTableCreator;

void Bootstrap() {
Become(&TScriptExecutionsTablesCreator::StateFunc);
RunCreateScriptExecutions();
RunCreateScriptExecutionLeases();
RunCreateScriptResultSets();
}
public:
explicit TScriptExecutionsTablesCreator()
: TBase({
GetScriptExecutionsCreator(),
GetScriptExecutionLeasesCreator(),
GetScriptResultSetsCreator()
})
{}

private:
static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, const char* columnType) {
NKikimrSchemeOp::TColumnDescription desc;
desc.SetName(columnName);
desc.SetType(columnType);
return desc;
}

static NKikimrSchemeOp::TColumnDescription Col(const TString& columnName, NScheme::TTypeId columnType) {
return Col(columnName, NScheme::TypeName(columnType));
}

static NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName) {
NKikimrSchemeOp::TTTLSettings settings;
settings.MutableEnabled()->SetExpireAfterSeconds(DEADLINE_OFFSET.Seconds());
settings.MutableEnabled()->SetColumnName(columnName);
settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(BRO_RUN_INTERVAL.MicroSeconds());
return settings;
}

void RunCreateScriptExecutions() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "script_executions" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("run_script_actor_id", NScheme::NTypeIds::Text),
Col("operation_status", NScheme::NTypeIds::Int32),
Col("execution_status", NScheme::NTypeIds::Int32),
Col("finalization_status", NScheme::NTypeIds::Int32),
Col("execution_mode", NScheme::NTypeIds::Int32),
Col("start_ts", NScheme::NTypeIds::Timestamp),
Col("end_ts", NScheme::NTypeIds::Timestamp),
Col("query_text", NScheme::NTypeIds::Text),
Col("syntax", NScheme::NTypeIds::Int32),
Col("ast", NScheme::NTypeIds::Text),
Col("ast_compressed", NScheme::NTypeIds::String),
Col("ast_compression_method", NScheme::NTypeIds::Text),
Col("issues", NScheme::NTypeIds::JsonDocument),
Col("plan", NScheme::NTypeIds::JsonDocument),
Col("meta", NScheme::NTypeIds::JsonDocument),
Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage.
Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
Col("stats", NScheme::NTypeIds::JsonDocument),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
Col("customer_supplied_id", NScheme::NTypeIds::Text),
Col("user_token", NScheme::NTypeIds::Text),
Col("script_sinks", NScheme::NTypeIds::JsonDocument),
Col("script_secret_names", NScheme::NTypeIds::JsonDocument),
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptExecutionsCreator() {
return CreateTableCreator(
{ ".metadata", "script_executions" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("run_script_actor_id", NScheme::NTypeIds::Text),
Col("operation_status", NScheme::NTypeIds::Int32),
Col("execution_status", NScheme::NTypeIds::Int32),
Col("finalization_status", NScheme::NTypeIds::Int32),
Col("execution_mode", NScheme::NTypeIds::Int32),
Col("start_ts", NScheme::NTypeIds::Timestamp),
Col("end_ts", NScheme::NTypeIds::Timestamp),
Col("query_text", NScheme::NTypeIds::Text),
Col("syntax", NScheme::NTypeIds::Int32),
Col("ast", NScheme::NTypeIds::Text),
Col("ast_compressed", NScheme::NTypeIds::String),
Col("ast_compression_method", NScheme::NTypeIds::Text),
Col("issues", NScheme::NTypeIds::JsonDocument),
Col("plan", NScheme::NTypeIds::JsonDocument),
Col("meta", NScheme::NTypeIds::JsonDocument),
Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage.
Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
Col("stats", NScheme::NTypeIds::JsonDocument),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
Col("customer_supplied_id", NScheme::NTypeIds::Text),
Col("user_token", NScheme::NTypeIds::Text),
Col("script_sinks", NScheme::NTypeIds::JsonDocument),
Col("script_secret_names", NScheme::NTypeIds::JsonDocument),
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void RunCreateScriptExecutionLeases() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "script_execution_leases" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("lease_deadline", NScheme::NTypeIds::Timestamp),
Col("lease_generation", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptExecutionLeasesCreator() {
return CreateTableCreator(
{ ".metadata", "script_execution_leases" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("lease_deadline", NScheme::NTypeIds::Timestamp),
Col("lease_generation", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
},
{ "database", "execution_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void RunCreateScriptResultSets() {
TablesCreating++;
Register(
CreateTableCreator(
{ ".metadata", "result_sets" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("result_set_id", NScheme::NTypeIds::Int32),
Col("row_id", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp),
Col("result_set", NScheme::NTypeIds::String),
Col("accumulated_size", NScheme::NTypeIds::Int64),
},
{ "database", "execution_id", "result_set_id", "row_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at")
)
static IActor* GetScriptResultSetsCreator() {
return CreateTableCreator(
{ ".metadata", "result_sets" },
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
Col("result_set_id", NScheme::NTypeIds::Int32),
Col("row_id", NScheme::NTypeIds::Int64),
Col("expire_at", NScheme::NTypeIds::Timestamp),
Col("result_set", NScheme::NTypeIds::String),
Col("accumulated_size", NScheme::NTypeIds::Int64),
},
{ "database", "execution_id", "result_set_id", "row_id" },
NKikimrServices::KQP_PROXY,
TtlCol("expire_at", DEADLINE_OFFSET, BRO_RUN_INTERVAL)
);
}

void Handle(TEvTableCreator::TEvCreateTableResponse::TPtr&) {
Y_ABORT_UNLESS(TablesCreating > 0);
if (--TablesCreating == 0) {
Send(Owner, std::move(ResultEvent));
PassAway();
}
void OnTablesCreated(bool success, NYql::TIssues issues) override {
Send(Owner, new TEvScriptExecutionsTablesCreationFinished(success, std::move(issues)));
}

STRICT_STFUNC(StateFunc,
hFunc(TEvTableCreator::TEvCreateTableResponse, Handle);
)

private:
THolder<NActors::IEventBase> ResultEvent;
NActors::TActorId Owner;
size_t TablesCreating = 0;
};

Ydb::Query::ExecMode GetExecModeFromAction(NKikimrKqp::EQueryAction action) {
Expand Down Expand Up @@ -2867,8 +2819,8 @@ NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPt
return new TCreateScriptExecutionActor(std::move(ev), queryServiceConfig, counters, maxRunTime);
}

NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent) {
return new TScriptExecutionsTablesCreator(std::move(resultEvent));
NActors::IActor* CreateScriptExecutionsTablesCreator() {
return new TScriptExecutionsTablesCreator();
}

NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/proxy_service/kqp_script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace NKikimr::NKqp {

// Creates all needed tables.
// Sends result event back when the work is done.
NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase> resultEvent);
NActors::IActor* CreateScriptExecutionsTablesCreator();

// Create script execution and run it.
NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime = SCRIPT_TIMEOUT_LIMIT);
Expand Down
Loading

0 comments on commit 8e7ec08

Please sign in to comment.