Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3333 added yql syntax for creating resource pools #5498

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/behaviour.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "behaviour.h"
#include "manager.h"

#include <ydb/services/metadata/abstract/initialization.h>


namespace NKikimr::NKqp {

TResourcePoolBehaviour::TFactory::TRegistrator<TResourcePoolBehaviour> TResourcePoolBehaviour::Registrator(TResourcePoolConfig::GetTypeId());

NMetadata::NInitializer::IInitializationBehaviour::TPtr TResourcePoolBehaviour::ConstructInitializer() const {
return nullptr;
}

NMetadata::NModifications::IOperationsManager::TPtr TResourcePoolBehaviour::ConstructOperationsManager() const {
return std::make_shared<TResourcePoolManager>();
}

TString TResourcePoolBehaviour::GetInternalStorageTablePath() const {
return TResourcePoolConfig::GetTypeId();
}


TString TResourcePoolBehaviour::GetTypeId() const {
return TResourcePoolConfig::GetTypeId();
}

} // namespace NKikimr::NKqp
27 changes: 27 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/behaviour.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <ydb/services/metadata/abstract/kqp_common.h>


namespace NKikimr::NKqp {

class TResourcePoolConfig {
public:
static TString GetTypeId() {
return "RESOURCE_POOL";
}
};

class TResourcePoolBehaviour: public NMetadata::TClassBehaviour<TResourcePoolConfig> {
static TFactory::TRegistrator<TResourcePoolBehaviour> Registrator;

protected:
virtual std::shared_ptr<NMetadata::NInitializer::IInitializationBehaviour> ConstructInitializer() const override;
virtual std::shared_ptr<NMetadata::NModifications::IOperationsManager> ConstructOperationsManager() const override;
virtual TString GetInternalStorageTablePath() const override;

public:
virtual TString GetTypeId() const override;
};

} // namespace NKikimr::NKqp
80 changes: 80 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include "manager.h"

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/base/feature_flags.h>


namespace NKikimr::NKqp {

namespace {

void CheckFeatureFlag(TResourcePoolManager::TInternalModificationContext& context) {
auto* actorSystem = context.GetExternalData().GetActorSystem();
if (!actorSystem) {
ythrow yexception() << "This place needs an actor system. Please contact internal support";
}

if (!AppData(actorSystem)->FeatureFlags.GetEnableResourcePools()) {
throw std::runtime_error("Resource pools are disabled. Please contact your system administrator to enable it");
}
}

void ValidateObjectId(const TString& objectId) {
if (objectId.find('/') != TString::npos) {
throw std::runtime_error("Resource pool id should not contain '/' symbol");
}
}

TResourcePoolManager::TYqlConclusionStatus StatusFromActivityType(TResourcePoolManager::EActivityType activityType) {
using TYqlConclusionStatus = TResourcePoolManager::TYqlConclusionStatus;
using EActivityType = TResourcePoolManager::EActivityType;

switch (activityType) {
case EActivityType::Undefined:
return TYqlConclusionStatus::Fail("Undefined operation for RESOURCE_POOL object");
case EActivityType::Upsert:
return TYqlConclusionStatus::Fail("Upsert operation for RESOURCE_POOL objects is not implemented");
case EActivityType::Create:
return TYqlConclusionStatus::Fail("Create operation for RESOURCE_POOL objects is not implemented");
case EActivityType::Alter:
return TYqlConclusionStatus::Fail("Alter operation for RESOURCE_POOL objects is not implemented");
case EActivityType::Drop:
return TYqlConclusionStatus::Fail("Drop operation for RESOURCE_POOL objects is not implemented");
}
}

} // anonymous namespace

NThreading::TFuture<TResourcePoolManager::TYqlConclusionStatus> TResourcePoolManager::DoModify(const NYql::TObjectSettingsImpl& settings, ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const {
Y_UNUSED(nodeId, manager);

try {
CheckFeatureFlag(context);
ValidateObjectId(settings.GetObjectId());

return NThreading::MakeFuture<TYqlConclusionStatus>(StatusFromActivityType(context.GetActivityType()));
} catch (...) {
return NThreading::MakeFuture<TYqlConclusionStatus>(TYqlConclusionStatus::Fail(CurrentExceptionMessage()));
}
}

TResourcePoolManager::TYqlConclusionStatus TResourcePoolManager::DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const {
Y_UNUSED(schemeOperation, manager);

try {
CheckFeatureFlag(context);
ValidateObjectId(settings.GetObjectId());

return StatusFromActivityType(context.GetActivityType());
} catch (...) {
return TYqlConclusionStatus::Fail(CurrentExceptionMessage());
}
}

NThreading::TFuture<TResourcePoolManager::TYqlConclusionStatus> TResourcePoolManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const {
Y_UNUSED(nodeId, manager, context);

return NThreading::MakeFuture(TYqlConclusionStatus::Fail(TStringBuilder() << "Execution of prepare operation for RESOURCE_POOL object: unsupported operation: " << static_cast<i32>(schemeOperation.GetOperationCase())));
}

} // namespace NKikimr::NKqp
25 changes: 25 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <ydb/services/metadata/manager/abstract.h>


namespace NKikimr::NKqp {

class TResourcePoolManager : public NMetadata::NModifications::IOperationsManager {
public:
using NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus;

protected:
NThreading::TFuture<TYqlConclusionStatus> DoModify(const NYql::TObjectSettingsImpl& settings, ui32 nodeId,
const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const override;

TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings,
const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override;

public:

NThreading::TFuture<TYqlConclusionStatus> ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation,
const ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override;
};

} // namespace NKikimr::NKqp
15 changes: 15 additions & 0 deletions ydb/core/kqp/gateway/behaviour/resource_pool/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
LIBRARY()

SRCS(
manager.cpp
GLOBAL behaviour.cpp
)

PEERDIR(
ydb/services/metadata/abstract
ydb/services/metadata/manager
)

YQL_LAST_ABI_VERSION()

END()
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/behaviour/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
RECURSE(
external_data_source
resource_pool
table
tablestore
)
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ PEERDIR(
ydb/core/kqp/gateway/behaviour/tablestore
ydb/core/kqp/gateway/behaviour/table
ydb/core/kqp/gateway/behaviour/external_data_source
ydb/core/kqp/gateway/behaviour/resource_pool
ydb/core/kqp/gateway/behaviour/view
ydb/core/kqp/gateway/utils
ydb/library/yql/providers/result/expr_nodes
Expand Down
45 changes: 45 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5988,6 +5988,51 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_EQUAL_C(describe.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}
}

Y_UNIT_TEST(DisableResourcePools) {
TKikimrRunner kikimr(TKikimrSettings().SetEnableResourcePools(false));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto checkDisabled = [&session](const TString& query) {
Cerr << "Check query:\n" << query << "\n";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pools are disabled. Please contact your system administrator to enable it");
};

// CREATE RESOURCE POOL
checkDisabled(R"(
CREATE RESOURCE POOL `MyResourcePool` WITH (
CONCURRENT_QUERY_LIMIT=20,
QUERY_CANCEL_AFTER_SECONDS=86400,
QUERY_COUNT_LIMIT=1000
);)");

// ALTER RESOURCE POOL
checkDisabled(R"(
ALTER RESOURCE POOL `MyResourcePool`
SET (CONCURRENT_QUERY_LIMIT = 30),
SET QUERY_COUNT_LIMIT 100,
RESET (QUERY_CANCEL_AFTER_SECONDS);
)");

// DROP RESOURCE POOL
checkDisabled("DROP RESOURCE POOL `MyResourcePool`;");
}

Y_UNIT_TEST(ResourcePoolsValidation) {
TKikimrRunner kikimr(TKikimrSettings().SetEnableResourcePools(true));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteSchemeQuery(R"(
CREATE RESOURCE POOL `MyFolder/MyResourcePool` WITH (
CONCURRENT_QUERY_LIMIT=20
);)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pool id should not contain '/' symbol");
}
}

Y_UNIT_TEST_SUITE(KqpOlapScheme) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,5 @@ message TFeatureFlags {
optional bool EnableExternalSourceSchemaInference = 126 [default = false];
optional bool EnableDbMetadataCache = 127 [default = false];
optional bool EnableTableDatetime64 = 128 [default = false];
optional bool EnableResourcePools = 129 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableReplaceIfExistsForExternalEntities)
FEATURE_FLAG_SETTER(EnableCMSRequestPriorities)
FEATURE_FLAG_SETTER(EnableTableDatetime64)
FEATURE_FLAG_SETTER(EnableResourcePools)

#undef FEATURE_FLAG_SETTER
};
Expand Down
21 changes: 21 additions & 0 deletions ydb/library/yql/sql/v1/SQLv1.g.in
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ sql_stmt_core:
| create_view_stmt
| drop_view_stmt
| alter_replication_stmt
| create_resource_pool_stmt
| alter_resource_pool_stmt
| drop_resource_pool_stmt
;

expr:
Expand Down Expand Up @@ -798,6 +801,21 @@ permission_name: permission_id | STRING_VALUE;

permission_name_target: permission_name (COMMA permission_name)* COMMA? | ALL PRIVILEGES?;

create_resource_pool_stmt: CREATE RESOURCE POOL object_ref
with_table_settings
;

alter_resource_pool_stmt: ALTER RESOURCE POOL object_ref
alter_resource_pool_action (COMMA alter_resource_pool_action)*
;
alter_resource_pool_action:
alter_table_set_table_setting_uncompat
| alter_table_set_table_setting_compat
| alter_table_reset_table_setting
;

drop_resource_pool_stmt: DROP RESOURCE POOL object_ref;

create_replication_stmt: CREATE ASYNC REPLICATION object_ref
FOR replication_target (COMMA replication_target)*
WITH LPAREN replication_settings RPAREN
Expand Down Expand Up @@ -1189,6 +1207,7 @@ keyword_as_compat:
| PATTERN
| PER
| PERMUTE
| POOL
| PRIVILEGES
| QUEUE
// | READ
Expand Down Expand Up @@ -1347,6 +1366,7 @@ keyword_compat: (
| PER
| PERMUTE
| PLAN
| POOL
| PRAGMA
| PRECEDING
| PRESORT
Expand Down Expand Up @@ -1695,6 +1715,7 @@ PATTERN: P A T T E R N;
PER: P E R;
PERMUTE: P E R M U T E;
PLAN: P L A N;
POOL: P O O L;
PRAGMA: P R A G M A;
PRECEDING: P R E C E D I N G;
PRESORT: P R E S O R T;
Expand Down
37 changes: 36 additions & 1 deletion ydb/library/yql/sql/v1/format/sql_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,38 @@ friend struct TStaticData;
VisitAllFields(TRule_drop_replication_stmt::GetDescriptor(), msg);
}

void VisitCreateResourcePool(const TRule_create_resource_pool_stmt& msg) {
PosFromToken(msg.GetToken1());
NewLine();
VisitAllFields(TRule_create_resource_pool_stmt::GetDescriptor(), msg);
}

void VisitAlterResourcePool(const TRule_alter_resource_pool_stmt& msg) {
PosFromToken(msg.GetToken1());
NewLine();
VisitToken(msg.GetToken1());
VisitToken(msg.GetToken2());
VisitToken(msg.GetToken3());
Visit(msg.GetRule_object_ref4());

NewLine();
PushCurrentIndent();
Visit(msg.GetRule_alter_resource_pool_action5());
for (const auto& action : msg.GetBlock6()) {
Visit(action.GetToken1()); // comma
NewLine();
Visit(action.GetRule_alter_resource_pool_action2());
}

PopCurrentIndent();
}

void VisitDropResourcePool(const TRule_drop_resource_pool_stmt& msg) {
PosFromToken(msg.GetToken1());
NewLine();
VisitAllFields(TRule_drop_resource_pool_stmt::GetDescriptor(), msg);
}

void VisitAllFields(const NProtoBuf::Descriptor* descr, const NProtoBuf::Message& msg) {
VisitAllFieldsImpl<TPrettyVisitor, &TPrettyVisitor::Visit>(this, descr, msg);
}
Expand Down Expand Up @@ -2708,7 +2740,10 @@ TStaticData::TStaticData()
{TRule_revoke_permissions_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitRevokePermissions)},
{TRule_alter_table_store_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterTableStore)},
{TRule_create_view_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateView)},
{TRule_drop_view_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropView)}
{TRule_drop_view_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropView)},
{TRule_create_resource_pool_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitCreateResourcePool)},
{TRule_alter_resource_pool_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitAlterResourcePool)},
{TRule_drop_resource_pool_stmt::GetDescriptor(), MakePrettyFunctor(&TPrettyVisitor::VisitDropResourcePool)}
})
, ObfuscatingVisitDispatch({
{TToken::GetDescriptor(), MakeObfuscatingFunctor(&TObfuscatingVisitor::VisitToken)},
Expand Down
18 changes: 18 additions & 0 deletions ydb/library/yql/sql/v1/format/sql_format_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1581,4 +1581,22 @@ FROM Input MATCH_RECOGNIZE (PATTERN (A) DEFINE A AS A);
TSetup setup;
setup.Run(cases);
}

Y_UNIT_TEST(ResourcePoolOperations) {
TCases cases = {
{"creAte reSourCe poOl naMe With (a = \"b\")",
"CREATE RESOURCE POOL naMe WITH (a = \"b\");\n"},
{"create resource pool eds with (a=\"a\",b=\"b\",c = true)",
"CREATE RESOURCE POOL eds WITH (\n\ta = \"a\",\n\tb = \"b\",\n\tc = TRUE\n);\n"},
{"alTer reSOurcE poOl naMe sEt a tRue, resEt (b, c), seT (x=y, z=false)",
"ALTER RESOURCE POOL naMe\n\tSET a TRUE,\n\tRESET (b, c),\n\tSET (x = y, z = FALSE);\n"},
{"alter resource pool eds reset (a), set (x=y)",
"ALTER RESOURCE POOL eds\n\tRESET (a),\n\tSET (x = y);\n"},
{"dRop reSourCe poOl naMe",
"DROP RESOURCE POOL naMe;\n"},
};

TSetup setup;
setup.Run(cases);
}
}
Loading
Loading