From f76af43c6a31ef84126b416e5b984b04f757ab2d Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Mon, 14 Oct 2024 13:48:20 +0300 Subject: [PATCH] YDB: implement connection via Query Service (#202) * YDB: first steps with Query Service * YDB Connector mode in config proto * Parametrize YQL connector mode * Reached first reading * one more appender * Supporting new way of making acceptors * Supporting new way of making acceptors pt. 2 * Basic tests passed with new connector * Debugging query logic * Linter complainings * Pushdown works basically * Troubles with database_missing test pt. 3 * Change signature of connection.Query * Not operational * Use QueryArgs everywhere * Most of tests are passed with QueryService * Linter complainings * Unit test fix * Linter fix * Linter fix * Review fix --- .golangci.yml | 4 +- app/client/connector/client.go | 4 + app/config/server.pb.go | 277 +++++++++++------- app/config/server.proto | 15 + app/server/config/config.debug.yaml | 1 + app/server/config/config.go | 23 +- .../rdbms/clickhouse/connection_http.go | 6 +- .../rdbms/clickhouse/connection_native.go | 6 +- .../rdbms/clickhouse/sql_formatter_test.go | 10 +- .../rdbms/clickhouse/table_metadata_query.go | 11 +- app/server/datasource/rdbms/data_source.go | 10 +- .../datasource/rdbms/data_source_factory.go | 6 +- .../rdbms/ms_sql_server/connection.go | 8 +- .../ms_sql_server/table_metadata_query.go | 10 +- .../datasource/rdbms/mysql/connection.go | 18 +- .../rdbms/mysql/table_metadata_query.go | 10 +- .../datasource/rdbms/oracle/connection.go | 14 +- .../rdbms/oracle/table_metadata_query.go | 10 +- .../rdbms/postgresql/connection_manager.go | 6 +- .../rdbms/postgresql/sql_formatter_test.go | 11 +- .../rdbms/postgresql/table_metadata_query.go | 11 +- .../rdbms/utils/predicate_builder.go | 112 ++++--- .../rdbms/utils/query_args_collector.go | 51 ++++ .../datasource/rdbms/utils/query_builder.go | 35 +-- .../datasource/rdbms/utils/schema_provider.go | 13 +- app/server/datasource/rdbms/utils/sql.go | 9 +- app/server/datasource/rdbms/utils/sql_mock.go | 6 +- .../rdbms/ydb/connection_database_sql.go | 127 ++++++++ .../rdbms/ydb/connection_manager.go | 108 ++----- .../datasource/rdbms/ydb/connection_native.go | 262 +++++++++++++++++ .../datasource/rdbms/ydb/schema_provider.go | 19 +- .../datasource/rdbms/ydb/sql_formatter.go | 19 +- .../rdbms/ydb/sql_formatter_test.go | 15 +- .../datasource/rdbms/ydb/type_mapper.go | 124 ++++++-- app/server/embedded_options.go | 12 + app/server/service_connector.go | 5 +- common/api_helpers.go | 10 + common/errors.go | 3 - go.mod | 6 +- go.sum | 12 +- tests/infra/datasource/ydb/suite.go | 3 +- tests/suite/scenario.go | 3 +- tests/suite/suite.go | 4 +- tools/docker_compose_update/internal.go | 4 +- 44 files changed, 1042 insertions(+), 391 deletions(-) create mode 100644 app/server/datasource/rdbms/utils/query_args_collector.go create mode 100644 app/server/datasource/rdbms/ydb/connection_database_sql.go create mode 100644 app/server/datasource/rdbms/ydb/connection_native.go diff --git a/.golangci.yml b/.golangci.yml index d6266ac3..8499c92f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -477,9 +477,9 @@ linters: - misspell - nakedret - noctx - - nolintlint + # - nolintlint - revive - # - staticcheck + # - staticcheck - stylecheck - typecheck - unconvert diff --git a/app/client/connector/client.go b/app/client/connector/client.go index eb16f3be..aeb2c5a3 100644 --- a/app/client/connector/client.go +++ b/app/client/connector/client.go @@ -177,6 +177,10 @@ func readSplits( return fmt.Errorf("read splits: %w", err) } + if err = common.ExtractErrorFromReadResponses(readSplitsResponses); err != nil { + return fmt.Errorf("extract error from read responses: %w", err) + } + logger.Debug("Obtained read splits responses", zap.Int("count", len(readSplitsResponses))) records, err := common.ReadResponsesToArrowRecords(readSplitsResponses) diff --git a/app/config/server.pb.go b/app/config/server.pb.go index 21f3ff98..e564a157 100644 --- a/app/config/server.pb.go +++ b/app/config/server.pb.go @@ -80,6 +80,60 @@ func (ELogLevel) EnumDescriptor() ([]byte, []int) { return file_app_config_server_proto_rawDescGZIP(), []int{0} } +type TYdbConfig_Mode int32 + +const ( + TYdbConfig_MODE_UNSPECIFIED TYdbConfig_Mode = 0 + // In MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES the YDB connector uses YDB's Table Service + // via Go's standard library database/sql interface. + // All the requests are marked as scan queries. + TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES TYdbConfig_Mode = 1 + // In MODE_QUERY_SERVICE_NATIVE the YDB connector uses YDB's Query Service + // via native YDB interface. + TYdbConfig_MODE_QUERY_SERVICE_NATIVE TYdbConfig_Mode = 2 +) + +// Enum value maps for TYdbConfig_Mode. +var ( + TYdbConfig_Mode_name = map[int32]string{ + 0: "MODE_UNSPECIFIED", + 1: "MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES", + 2: "MODE_QUERY_SERVICE_NATIVE", + } + TYdbConfig_Mode_value = map[string]int32{ + "MODE_UNSPECIFIED": 0, + "MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES": 1, + "MODE_QUERY_SERVICE_NATIVE": 2, + } +) + +func (x TYdbConfig_Mode) Enum() *TYdbConfig_Mode { + p := new(TYdbConfig_Mode) + *p = x + return p +} + +func (x TYdbConfig_Mode) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (TYdbConfig_Mode) Descriptor() protoreflect.EnumDescriptor { + return file_app_config_server_proto_enumTypes[1].Descriptor() +} + +func (TYdbConfig_Mode) Type() protoreflect.EnumType { + return &file_app_config_server_proto_enumTypes[1] +} + +func (x TYdbConfig_Mode) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use TYdbConfig_Mode.Descriptor instead. +func (TYdbConfig_Mode) EnumDescriptor() ([]byte, []int) { + return file_app_config_server_proto_rawDescGZIP(), []int{16, 0} +} + // Connector server configuration type TServerConfig struct { state protoimpl.MessageState @@ -1175,8 +1229,11 @@ type TYdbConfig struct { // Valid values should satisfy `time.ParseDuration` (e. g. '5s', '100ms', '3h'). PingConnectionTimeout string `protobuf:"bytes,2,opt,name=ping_connection_timeout,json=pingConnectionTimeout,proto3" json:"ping_connection_timeout,omitempty"` // Flag forcing the usage of underlay networks for dedicated YDB databases - UseUnderlayNetworkForDedicatedDatabases bool `protobuf:"varint,3,opt,name=use_underlay_network_for_dedicated_databases,json=useUnderlayNetworkForDedicatedDatabases,proto3" json:"use_underlay_network_for_dedicated_databases,omitempty"` - ExponentialBackoff *TExponentialBackoffConfig `protobuf:"bytes,10,opt,name=exponential_backoff,json=exponentialBackoff,proto3" json:"exponential_backoff,omitempty"` + UseUnderlayNetworkForDedicatedDatabases bool `protobuf:"varint,3,opt,name=use_underlay_network_for_dedicated_databases,json=useUnderlayNetworkForDedicatedDatabases,proto3" json:"use_underlay_network_for_dedicated_databases,omitempty"` + // Mode parametrizes the way YDB connector interacts with YDB servers. + // MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES is the default mode. + Mode TYdbConfig_Mode `protobuf:"varint,4,opt,name=mode,proto3,enum=NYql.Connector.App.Config.TYdbConfig_Mode" json:"mode,omitempty"` + ExponentialBackoff *TExponentialBackoffConfig `protobuf:"bytes,10,opt,name=exponential_backoff,json=exponentialBackoff,proto3" json:"exponential_backoff,omitempty"` } func (x *TYdbConfig) Reset() { @@ -1232,6 +1289,13 @@ func (x *TYdbConfig) GetUseUnderlayNetworkForDedicatedDatabases() bool { return false } +func (x *TYdbConfig) GetMode() TYdbConfig_Mode { + if x != nil { + return x.Mode + } + return TYdbConfig_MODE_UNSPECIFIED +} + func (x *TYdbConfig) GetExponentialBackoff() *TExponentialBackoffConfig { if x != nil { return x.ExponentialBackoff @@ -1550,7 +1614,7 @@ var file_app_config_server_proto_rawDesc = []byte{ 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x45, 0x78, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x42, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x12, 0x65, 0x78, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x42, 0x61, - 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x22, 0xc2, 0x02, 0x0a, 0x0a, 0x54, 0x59, 0x64, 0x62, 0x43, 0x6f, + 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x22, 0xeb, 0x03, 0x0a, 0x0a, 0x54, 0x59, 0x64, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x36, 0x0a, 0x17, 0x6f, 0x70, 0x65, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x6f, 0x70, 0x65, 0x6e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, @@ -1564,54 +1628,65 @@ var file_app_config_server_proto_rawDesc = []byte{ 0x61, 0x73, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x27, 0x75, 0x73, 0x65, 0x55, 0x6e, 0x64, 0x65, 0x72, 0x6c, 0x61, 0x79, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x46, 0x6f, 0x72, 0x44, 0x65, 0x64, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, - 0x73, 0x65, 0x73, 0x12, 0x65, 0x0a, 0x13, 0x65, 0x78, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x69, + 0x73, 0x65, 0x73, 0x12, 0x3e, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x2a, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x59, + 0x64, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x4d, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6d, + 0x6f, 0x64, 0x65, 0x12, 0x65, 0x0a, 0x13, 0x65, 0x78, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x5f, 0x62, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x45, 0x78, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x42, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x12, 0x65, 0x78, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, - 0x69, 0x61, 0x6c, 0x42, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x22, 0x88, 0x04, 0x0a, 0x12, 0x54, - 0x44, 0x61, 0x74, 0x61, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x4c, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x63, 0x6b, 0x68, 0x6f, 0x75, 0x73, 0x65, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, + 0x69, 0x61, 0x6c, 0x42, 0x61, 0x63, 0x6b, 0x6f, 0x66, 0x66, 0x22, 0x67, 0x0a, 0x04, 0x4d, 0x6f, + 0x64, 0x65, 0x12, 0x14, 0x0a, 0x10, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, + 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x2a, 0x0a, 0x26, 0x4d, 0x4f, 0x44, 0x45, + 0x5f, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x53, + 0x54, 0x44, 0x4c, 0x49, 0x42, 0x5f, 0x53, 0x43, 0x41, 0x4e, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x49, + 0x45, 0x53, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x4d, 0x4f, 0x44, 0x45, 0x5f, 0x51, 0x55, 0x45, + 0x52, 0x59, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x4e, 0x41, 0x54, 0x49, 0x56, + 0x45, 0x10, 0x02, 0x22, 0x88, 0x04, 0x0a, 0x12, 0x54, 0x44, 0x61, 0x74, 0x61, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4c, 0x0a, 0x0a, 0x63, 0x6c, + 0x69, 0x63, 0x6b, 0x68, 0x6f, 0x75, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2c, + 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, + 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x43, 0x6c, 0x69, 0x63, + 0x6b, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a, 0x63, 0x6c, + 0x69, 0x63, 0x6b, 0x68, 0x6f, 0x75, 0x73, 0x65, 0x12, 0x49, 0x0a, 0x09, 0x67, 0x72, 0x65, 0x65, + 0x6e, 0x70, 0x6c, 0x75, 0x6d, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, + 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x47, 0x72, 0x65, 0x65, 0x6e, 0x70, 0x6c, + 0x75, 0x6d, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x09, 0x67, 0x72, 0x65, 0x65, 0x6e, 0x70, + 0x6c, 0x75, 0x6d, 0x12, 0x51, 0x0a, 0x0d, 0x6d, 0x73, 0x5f, 0x73, 0x71, 0x6c, 0x5f, 0x73, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x4e, 0x59, 0x71, + 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x4d, 0x73, 0x53, 0x51, 0x4c, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x6d, 0x73, 0x53, 0x71, 0x6c, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x05, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x2e, 0x54, 0x43, 0x6c, 0x69, 0x63, 0x6b, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x43, 0x6f, 0x6e, - 0x66, 0x69, 0x67, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x63, 0x6b, 0x68, 0x6f, 0x75, 0x73, 0x65, 0x12, - 0x49, 0x0a, 0x09, 0x67, 0x72, 0x65, 0x65, 0x6e, 0x70, 0x6c, 0x75, 0x6d, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, - 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, - 0x47, 0x72, 0x65, 0x65, 0x6e, 0x70, 0x6c, 0x75, 0x6d, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, - 0x09, 0x67, 0x72, 0x65, 0x65, 0x6e, 0x70, 0x6c, 0x75, 0x6d, 0x12, 0x51, 0x0a, 0x0d, 0x6d, 0x73, - 0x5f, 0x73, 0x71, 0x6c, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x2d, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x4d, - 0x73, 0x53, 0x51, 0x4c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x52, 0x0b, 0x6d, 0x73, 0x53, 0x71, 0x6c, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x3d, 0x0a, - 0x05, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x4e, - 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, - 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x4d, 0x79, 0x53, 0x51, 0x4c, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x05, 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x12, 0x40, 0x0a, 0x06, - 0x6f, 0x72, 0x61, 0x63, 0x6c, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x4e, - 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, - 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x4f, 0x72, 0x61, 0x63, 0x6c, 0x65, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x06, 0x6f, 0x72, 0x61, 0x63, 0x6c, 0x65, 0x12, 0x4c, - 0x0a, 0x0a, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x71, 0x6c, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, + 0x67, 0x2e, 0x54, 0x4d, 0x79, 0x53, 0x51, 0x4c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x05, + 0x6d, 0x79, 0x73, 0x71, 0x6c, 0x12, 0x40, 0x0a, 0x06, 0x6f, 0x72, 0x61, 0x63, 0x6c, 0x65, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x2e, 0x54, 0x4f, 0x72, 0x61, 0x63, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, + 0x06, 0x6f, 0x72, 0x61, 0x63, 0x6c, 0x65, 0x12, 0x4c, 0x0a, 0x0a, 0x70, 0x6f, 0x73, 0x74, 0x67, + 0x72, 0x65, 0x73, 0x71, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x4e, 0x59, + 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, + 0x53, 0x51, 0x4c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a, 0x70, 0x6f, 0x73, 0x74, 0x67, + 0x72, 0x65, 0x73, 0x71, 0x6c, 0x12, 0x37, 0x0a, 0x03, 0x79, 0x64, 0x62, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, - 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x53, 0x51, 0x4c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x52, 0x0a, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x65, 0x73, 0x71, 0x6c, 0x12, 0x37, 0x0a, 0x03, - 0x79, 0x64, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x4e, 0x59, 0x71, 0x6c, - 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x59, 0x64, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x52, 0x03, 0x79, 0x64, 0x62, 0x2a, 0x4b, 0x0a, 0x09, 0x45, 0x4c, 0x6f, 0x67, 0x4c, 0x65, 0x76, - 0x65, 0x6c, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x52, 0x41, 0x43, 0x45, 0x10, 0x00, 0x12, 0x09, 0x0a, - 0x05, 0x44, 0x45, 0x42, 0x55, 0x47, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x4e, 0x46, 0x4f, - 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x57, 0x41, 0x52, 0x4e, 0x10, 0x03, 0x12, 0x09, 0x0a, 0x05, - 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x04, 0x12, 0x09, 0x0a, 0x05, 0x46, 0x41, 0x54, 0x41, 0x4c, - 0x10, 0x05, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x66, 0x71, - 0x2d, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, - 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x59, 0x64, 0x62, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x03, 0x79, 0x64, 0x62, 0x2a, 0x4b, + 0x0a, 0x09, 0x45, 0x4c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x09, 0x0a, 0x05, 0x54, + 0x52, 0x41, 0x43, 0x45, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x44, 0x45, 0x42, 0x55, 0x47, 0x10, + 0x01, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x4e, 0x46, 0x4f, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x57, + 0x41, 0x52, 0x4e, 0x10, 0x03, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x04, + 0x12, 0x09, 0x0a, 0x05, 0x46, 0x41, 0x54, 0x41, 0x4c, 0x10, 0x05, 0x42, 0x34, 0x5a, 0x32, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, + 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x66, 0x71, 0x2d, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1626,67 +1701,69 @@ func file_app_config_server_proto_rawDescGZIP() []byte { return file_app_config_server_proto_rawDescData } -var file_app_config_server_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_app_config_server_proto_enumTypes = make([]protoimpl.EnumInfo, 2) var file_app_config_server_proto_msgTypes = make([]protoimpl.MessageInfo, 18) var file_app_config_server_proto_goTypes = []interface{}{ (ELogLevel)(0), // 0: NYql.Connector.App.Config.ELogLevel - (*TServerConfig)(nil), // 1: NYql.Connector.App.Config.TServerConfig - (*TConnectorServerConfig)(nil), // 2: NYql.Connector.App.Config.TConnectorServerConfig - (*TServerTLSConfig)(nil), // 3: NYql.Connector.App.Config.TServerTLSConfig - (*TServerReadLimit)(nil), // 4: NYql.Connector.App.Config.TServerReadLimit - (*TLoggerConfig)(nil), // 5: NYql.Connector.App.Config.TLoggerConfig - (*TPprofServerConfig)(nil), // 6: NYql.Connector.App.Config.TPprofServerConfig - (*TMetricsServerConfig)(nil), // 7: NYql.Connector.App.Config.TMetricsServerConfig - (*TPagingConfig)(nil), // 8: NYql.Connector.App.Config.TPagingConfig - (*TConversionConfig)(nil), // 9: NYql.Connector.App.Config.TConversionConfig - (*TExponentialBackoffConfig)(nil), // 10: NYql.Connector.App.Config.TExponentialBackoffConfig - (*TClickHouseConfig)(nil), // 11: NYql.Connector.App.Config.TClickHouseConfig - (*TGreenplumConfig)(nil), // 12: NYql.Connector.App.Config.TGreenplumConfig - (*TMsSQLServerConfig)(nil), // 13: NYql.Connector.App.Config.TMsSQLServerConfig - (*TMySQLConfig)(nil), // 14: NYql.Connector.App.Config.TMySQLConfig - (*TOracleConfig)(nil), // 15: NYql.Connector.App.Config.TOracleConfig - (*TPostgreSQLConfig)(nil), // 16: NYql.Connector.App.Config.TPostgreSQLConfig - (*TYdbConfig)(nil), // 17: NYql.Connector.App.Config.TYdbConfig - (*TDatasourcesConfig)(nil), // 18: NYql.Connector.App.Config.TDatasourcesConfig - (*common.TEndpoint)(nil), // 19: NYql.NConnector.NApi.TEndpoint + (TYdbConfig_Mode)(0), // 1: NYql.Connector.App.Config.TYdbConfig.Mode + (*TServerConfig)(nil), // 2: NYql.Connector.App.Config.TServerConfig + (*TConnectorServerConfig)(nil), // 3: NYql.Connector.App.Config.TConnectorServerConfig + (*TServerTLSConfig)(nil), // 4: NYql.Connector.App.Config.TServerTLSConfig + (*TServerReadLimit)(nil), // 5: NYql.Connector.App.Config.TServerReadLimit + (*TLoggerConfig)(nil), // 6: NYql.Connector.App.Config.TLoggerConfig + (*TPprofServerConfig)(nil), // 7: NYql.Connector.App.Config.TPprofServerConfig + (*TMetricsServerConfig)(nil), // 8: NYql.Connector.App.Config.TMetricsServerConfig + (*TPagingConfig)(nil), // 9: NYql.Connector.App.Config.TPagingConfig + (*TConversionConfig)(nil), // 10: NYql.Connector.App.Config.TConversionConfig + (*TExponentialBackoffConfig)(nil), // 11: NYql.Connector.App.Config.TExponentialBackoffConfig + (*TClickHouseConfig)(nil), // 12: NYql.Connector.App.Config.TClickHouseConfig + (*TGreenplumConfig)(nil), // 13: NYql.Connector.App.Config.TGreenplumConfig + (*TMsSQLServerConfig)(nil), // 14: NYql.Connector.App.Config.TMsSQLServerConfig + (*TMySQLConfig)(nil), // 15: NYql.Connector.App.Config.TMySQLConfig + (*TOracleConfig)(nil), // 16: NYql.Connector.App.Config.TOracleConfig + (*TPostgreSQLConfig)(nil), // 17: NYql.Connector.App.Config.TPostgreSQLConfig + (*TYdbConfig)(nil), // 18: NYql.Connector.App.Config.TYdbConfig + (*TDatasourcesConfig)(nil), // 19: NYql.Connector.App.Config.TDatasourcesConfig + (*common.TEndpoint)(nil), // 20: NYql.NConnector.NApi.TEndpoint } var file_app_config_server_proto_depIdxs = []int32{ - 19, // 0: NYql.Connector.App.Config.TServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint - 3, // 1: NYql.Connector.App.Config.TServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig - 2, // 2: NYql.Connector.App.Config.TServerConfig.connector_server:type_name -> NYql.Connector.App.Config.TConnectorServerConfig - 4, // 3: NYql.Connector.App.Config.TServerConfig.read_limit:type_name -> NYql.Connector.App.Config.TServerReadLimit - 5, // 4: NYql.Connector.App.Config.TServerConfig.logger:type_name -> NYql.Connector.App.Config.TLoggerConfig - 6, // 5: NYql.Connector.App.Config.TServerConfig.pprof_server:type_name -> NYql.Connector.App.Config.TPprofServerConfig - 7, // 6: NYql.Connector.App.Config.TServerConfig.metrics_server:type_name -> NYql.Connector.App.Config.TMetricsServerConfig - 8, // 7: NYql.Connector.App.Config.TServerConfig.paging:type_name -> NYql.Connector.App.Config.TPagingConfig - 9, // 8: NYql.Connector.App.Config.TServerConfig.conversion:type_name -> NYql.Connector.App.Config.TConversionConfig - 18, // 9: NYql.Connector.App.Config.TServerConfig.datasources:type_name -> NYql.Connector.App.Config.TDatasourcesConfig - 19, // 10: NYql.Connector.App.Config.TConnectorServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint - 3, // 11: NYql.Connector.App.Config.TConnectorServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig + 20, // 0: NYql.Connector.App.Config.TServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint + 4, // 1: NYql.Connector.App.Config.TServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig + 3, // 2: NYql.Connector.App.Config.TServerConfig.connector_server:type_name -> NYql.Connector.App.Config.TConnectorServerConfig + 5, // 3: NYql.Connector.App.Config.TServerConfig.read_limit:type_name -> NYql.Connector.App.Config.TServerReadLimit + 6, // 4: NYql.Connector.App.Config.TServerConfig.logger:type_name -> NYql.Connector.App.Config.TLoggerConfig + 7, // 5: NYql.Connector.App.Config.TServerConfig.pprof_server:type_name -> NYql.Connector.App.Config.TPprofServerConfig + 8, // 6: NYql.Connector.App.Config.TServerConfig.metrics_server:type_name -> NYql.Connector.App.Config.TMetricsServerConfig + 9, // 7: NYql.Connector.App.Config.TServerConfig.paging:type_name -> NYql.Connector.App.Config.TPagingConfig + 10, // 8: NYql.Connector.App.Config.TServerConfig.conversion:type_name -> NYql.Connector.App.Config.TConversionConfig + 19, // 9: NYql.Connector.App.Config.TServerConfig.datasources:type_name -> NYql.Connector.App.Config.TDatasourcesConfig + 20, // 10: NYql.Connector.App.Config.TConnectorServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint + 4, // 11: NYql.Connector.App.Config.TConnectorServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig 0, // 12: NYql.Connector.App.Config.TLoggerConfig.log_level:type_name -> NYql.Connector.App.Config.ELogLevel - 19, // 13: NYql.Connector.App.Config.TPprofServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint - 3, // 14: NYql.Connector.App.Config.TPprofServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig - 19, // 15: NYql.Connector.App.Config.TMetricsServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint - 3, // 16: NYql.Connector.App.Config.TMetricsServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig - 10, // 17: NYql.Connector.App.Config.TClickHouseConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig - 10, // 18: NYql.Connector.App.Config.TGreenplumConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig - 10, // 19: NYql.Connector.App.Config.TMsSQLServerConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig - 10, // 20: NYql.Connector.App.Config.TMySQLConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig - 10, // 21: NYql.Connector.App.Config.TOracleConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig - 10, // 22: NYql.Connector.App.Config.TPostgreSQLConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig - 10, // 23: NYql.Connector.App.Config.TYdbConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig - 11, // 24: NYql.Connector.App.Config.TDatasourcesConfig.clickhouse:type_name -> NYql.Connector.App.Config.TClickHouseConfig - 12, // 25: NYql.Connector.App.Config.TDatasourcesConfig.greenplum:type_name -> NYql.Connector.App.Config.TGreenplumConfig - 13, // 26: NYql.Connector.App.Config.TDatasourcesConfig.ms_sql_server:type_name -> NYql.Connector.App.Config.TMsSQLServerConfig - 14, // 27: NYql.Connector.App.Config.TDatasourcesConfig.mysql:type_name -> NYql.Connector.App.Config.TMySQLConfig - 15, // 28: NYql.Connector.App.Config.TDatasourcesConfig.oracle:type_name -> NYql.Connector.App.Config.TOracleConfig - 16, // 29: NYql.Connector.App.Config.TDatasourcesConfig.postgresql:type_name -> NYql.Connector.App.Config.TPostgreSQLConfig - 17, // 30: NYql.Connector.App.Config.TDatasourcesConfig.ydb:type_name -> NYql.Connector.App.Config.TYdbConfig - 31, // [31:31] is the sub-list for method output_type - 31, // [31:31] is the sub-list for method input_type - 31, // [31:31] is the sub-list for extension type_name - 31, // [31:31] is the sub-list for extension extendee - 0, // [0:31] is the sub-list for field type_name + 20, // 13: NYql.Connector.App.Config.TPprofServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint + 4, // 14: NYql.Connector.App.Config.TPprofServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig + 20, // 15: NYql.Connector.App.Config.TMetricsServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint + 4, // 16: NYql.Connector.App.Config.TMetricsServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig + 11, // 17: NYql.Connector.App.Config.TClickHouseConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig + 11, // 18: NYql.Connector.App.Config.TGreenplumConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig + 11, // 19: NYql.Connector.App.Config.TMsSQLServerConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig + 11, // 20: NYql.Connector.App.Config.TMySQLConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig + 11, // 21: NYql.Connector.App.Config.TOracleConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig + 11, // 22: NYql.Connector.App.Config.TPostgreSQLConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig + 1, // 23: NYql.Connector.App.Config.TYdbConfig.mode:type_name -> NYql.Connector.App.Config.TYdbConfig.Mode + 11, // 24: NYql.Connector.App.Config.TYdbConfig.exponential_backoff:type_name -> NYql.Connector.App.Config.TExponentialBackoffConfig + 12, // 25: NYql.Connector.App.Config.TDatasourcesConfig.clickhouse:type_name -> NYql.Connector.App.Config.TClickHouseConfig + 13, // 26: NYql.Connector.App.Config.TDatasourcesConfig.greenplum:type_name -> NYql.Connector.App.Config.TGreenplumConfig + 14, // 27: NYql.Connector.App.Config.TDatasourcesConfig.ms_sql_server:type_name -> NYql.Connector.App.Config.TMsSQLServerConfig + 15, // 28: NYql.Connector.App.Config.TDatasourcesConfig.mysql:type_name -> NYql.Connector.App.Config.TMySQLConfig + 16, // 29: NYql.Connector.App.Config.TDatasourcesConfig.oracle:type_name -> NYql.Connector.App.Config.TOracleConfig + 17, // 30: NYql.Connector.App.Config.TDatasourcesConfig.postgresql:type_name -> NYql.Connector.App.Config.TPostgreSQLConfig + 18, // 31: NYql.Connector.App.Config.TDatasourcesConfig.ydb:type_name -> NYql.Connector.App.Config.TYdbConfig + 32, // [32:32] is the sub-list for method output_type + 32, // [32:32] is the sub-list for method input_type + 32, // [32:32] is the sub-list for extension type_name + 32, // [32:32] is the sub-list for extension extendee + 0, // [0:32] is the sub-list for field type_name } func init() { file_app_config_server_proto_init() } @@ -1917,7 +1994,7 @@ func file_app_config_server_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_app_config_server_proto_rawDesc, - NumEnums: 1, + NumEnums: 2, NumMessages: 18, NumExtensions: 0, NumServices: 0, diff --git a/app/config/server.proto b/app/config/server.proto index d9d8a9ee..215fecf1 100644 --- a/app/config/server.proto +++ b/app/config/server.proto @@ -213,6 +213,21 @@ message TYdbConfig { // Flag forcing the usage of underlay networks for dedicated YDB databases bool use_underlay_network_for_dedicated_databases = 3; + enum Mode { + MODE_UNSPECIFIED = 0; + // In MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES the YDB connector uses YDB's Table Service + // via Go's standard library database/sql interface. + // All the requests are marked as scan queries. + MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES = 1; + // In MODE_QUERY_SERVICE_NATIVE the YDB connector uses YDB's Query Service + // via native YDB interface. + MODE_QUERY_SERVICE_NATIVE = 2; + } + + // Mode parametrizes the way YDB connector interacts with YDB servers. + // MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES is the default mode. + Mode mode = 4; + TExponentialBackoffConfig exponential_backoff = 10; } diff --git a/app/server/config/config.debug.yaml b/app/server/config/config.debug.yaml index 2e333b63..c480fa98 100644 --- a/app/server/config/config.debug.yaml +++ b/app/server/config/config.debug.yaml @@ -57,3 +57,4 @@ datasources: ydb: <<: *data_source_default_var use_underlay_network_for_dedicated_databases: false + mode: MODE_QUERY_SERVICE_NATIVE diff --git a/app/server/config/config.go b/app/server/config/config.go index 8efa0b2b..bf0857db 100644 --- a/app/server/config/config.go +++ b/app/server/config/config.go @@ -121,7 +121,7 @@ func fillServerConfigDefaults(c *config.TServerConfig) { c.Datasources.Oracle.ExponentialBackoff = makeDefaultExponentialBackoffConfig() } - // Postgresql + // PostgreSQL if c.Datasources.Postgresql == nil { c.Datasources.Postgresql = &config.TPostgreSQLConfig{ @@ -136,10 +136,19 @@ func fillServerConfigDefaults(c *config.TServerConfig) { // YDB if c.Datasources.Ydb == nil { - c.Datasources.Ydb = &config.TYdbConfig{ - OpenConnectionTimeout: "5s", - PingConnectionTimeout: "5s", - } + c.Datasources.Ydb = &config.TYdbConfig{} + } + + if c.Datasources.Ydb.OpenConnectionTimeout == "" { + c.Datasources.Ydb.OpenConnectionTimeout = "5s" + } + + if c.Datasources.Ydb.PingConnectionTimeout == "" { + c.Datasources.Ydb.PingConnectionTimeout = "5s" + } + + if c.Datasources.Ydb.Mode == config.TYdbConfig_MODE_UNSPECIFIED { + c.Datasources.Ydb.Mode = config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES } if c.Datasources.Ydb.ExponentialBackoff == nil { @@ -307,6 +316,10 @@ func validateYdbConfig(c *config.TYdbConfig) error { return fmt.Errorf("validate `ping_connection_timeout`: %v", err) } + if c.Mode == config.TYdbConfig_MODE_UNSPECIFIED { + return fmt.Errorf("invalid `mode` value: %v", c.Mode) + } + if err := validateExponentialBackoff(c.ExponentialBackoff); err != nil { return fmt.Errorf("validate `exponential_backoff`: %v", err) } diff --git a/app/server/datasource/rdbms/clickhouse/connection_http.go b/app/server/datasource/rdbms/clickhouse/connection_http.go index 26b8c38e..9d7a7ec8 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_http.go +++ b/app/server/datasource/rdbms/clickhouse/connection_http.go @@ -49,10 +49,10 @@ func (r *rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) ( return transformer, nil } -func (c *connectionHTTP) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) { - c.logger.Dump(query, args...) +func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { + c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) - out, err := c.DB.QueryContext(ctx, query, args...) + out, err := c.DB.QueryContext(params.Ctx, params.QueryText, params.QueryArgs.Values()...) if err != nil { return nil, fmt.Errorf("query context: %w", err) } diff --git a/app/server/datasource/rdbms/clickhouse/connection_native.go b/app/server/datasource/rdbms/clickhouse/connection_native.go index 447da5fc..5722d5ca 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_native.go +++ b/app/server/datasource/rdbms/clickhouse/connection_native.go @@ -51,10 +51,10 @@ func (r *rowsNative) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collect return transformer, nil } -func (c *connectionNative) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) { - c.logger.Dump(query, args...) +func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { + c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) - out, err := c.Conn.Query(ctx, query, args...) + out, err := c.Conn.Query(params.Ctx, params.QueryText, params.QueryArgs.Values()...) if err != nil { return nil, fmt.Errorf("query context: %w", err) } diff --git a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go index da259ba7..a07caced 100644 --- a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go +++ b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go @@ -1,6 +1,7 @@ package clickhouse import ( + "context" "errors" "testing" @@ -451,15 +452,18 @@ func TestMakeSQLFormatterQuery(t *testing.T) { t.Run(tc.testName, func(t *testing.T) { readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery( - logger, formatter, tc.selectReq, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL) + context.Background(), + logger, formatter, + tc.selectReq, + api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL) if tc.err != nil { require.True(t, errors.Is(err, tc.err)) return } require.NoError(t, err) - require.Equal(t, tc.outputQuery, readSplitsQuery.Query) - require.Equal(t, tc.outputArgs, readSplitsQuery.Args) + require.Equal(t, tc.outputQuery, readSplitsQuery.QueryText) + require.Equal(t, tc.outputArgs, readSplitsQuery.QueryArgs.Values()) require.Equal(t, tc.outputSelectWhat, readSplitsQuery.What) }) } diff --git a/app/server/datasource/rdbms/clickhouse/table_metadata_query.go b/app/server/datasource/rdbms/clickhouse/table_metadata_query.go index 63c05c3c..ba49e6e6 100644 --- a/app/server/datasource/rdbms/clickhouse/table_metadata_query.go +++ b/app/server/datasource/rdbms/clickhouse/table_metadata_query.go @@ -2,11 +2,16 @@ package clickhouse import ( api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" ) -func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) { +func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) { query := "SELECT name, type FROM system.columns WHERE table = ? and database = ?" - args := []any{request.Table, request.DataSourceInstance.Database} - return query, args + var args rdbms_utils.QueryArgs + + args.AddUntyped(request.Table) + args.AddUntyped(request.DataSourceInstance.Database) + + return query, &args } diff --git a/app/server/datasource/rdbms/data_source.go b/app/server/datasource/rdbms/data_source.go index 02dcbe58..125d0b92 100644 --- a/app/server/datasource/rdbms/data_source.go +++ b/app/server/datasource/rdbms/data_source.go @@ -76,14 +76,16 @@ func (ds *dataSourceImpl) doReadSplit( split *api_service_protos.TSplit, sink paging.Sink[any], ) error { - readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery(logger, ds.sqlFormatter, split.Select, request.Filtering) + readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery(ctx, logger, ds.sqlFormatter, split.Select, request.Filtering) if err != nil { return fmt.Errorf("make read split query: %w", err) } var conn rdbms_utils.Connection - err = ds.retrierSet.MakeConnection.Run(ctx, logger, + err = ds.retrierSet.MakeConnection.Run( + ctx, + logger, func() error { var makeConnErr error @@ -110,8 +112,8 @@ func (ds *dataSourceImpl) doReadSplit( func() error { var queryErr error - if rows, queryErr = conn.Query(ctx, logger, readSplitsQuery.Query, readSplitsQuery.Args...); queryErr != nil { - return fmt.Errorf("query '%s' error: %w", readSplitsQuery.Query, queryErr) + if rows, queryErr = conn.Query(&readSplitsQuery.QueryParams); queryErr != nil { + return fmt.Errorf("query '%s' error: %w", readSplitsQuery.QueryText, queryErr) } return nil diff --git a/app/server/datasource/rdbms/data_source_factory.go b/app/server/datasource/rdbms/data_source_factory.go index 0c54e1ae..eeaa9681 100644 --- a/app/server/datasource/rdbms/data_source_factory.go +++ b/app/server/datasource/rdbms/data_source_factory.go @@ -97,7 +97,7 @@ func NewDataSourceFactory( TypeMapper: postgresqlTypeMapper, SchemaProvider: rdbms_utils.NewDefaultSchemaProvider( postgresqlTypeMapper, - func(request *api_service_protos.TDescribeTableRequest) (string, []any) { + func(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) { return postgresql.TableMetadataQuery( request, schemaGetters[api_common.EDataSourceKind_POSTGRESQL](request.DataSourceInstance)) @@ -108,7 +108,7 @@ func NewDataSourceFactory( }, }, ydb: Preset{ - SQLFormatter: ydb.NewSQLFormatter(), + SQLFormatter: ydb.NewSQLFormatter(cfg.Ydb.Mode), ConnectionManager: ydb.NewConnectionManager(cfg.Ydb, connManagerBase), TypeMapper: ydbTypeMapper, SchemaProvider: ydb.NewSchemaProvider(ydbTypeMapper), @@ -144,7 +144,7 @@ func NewDataSourceFactory( TypeMapper: postgresqlTypeMapper, SchemaProvider: rdbms_utils.NewDefaultSchemaProvider( postgresqlTypeMapper, - func(request *api_service_protos.TDescribeTableRequest) (string, []any) { + func(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) { return postgresql.TableMetadataQuery( request, schemaGetters[api_common.EDataSourceKind_GREENPLUM](request.DataSourceInstance)) diff --git a/app/server/datasource/rdbms/ms_sql_server/connection.go b/app/server/datasource/rdbms/ms_sql_server/connection.go index fbd7b27e..56e772a7 100644 --- a/app/server/datasource/rdbms/ms_sql_server/connection.go +++ b/app/server/datasource/rdbms/ms_sql_server/connection.go @@ -1,11 +1,9 @@ package ms_sql_server import ( - "context" "database/sql" _ "github.com/denisenkom/go-mssqldb" - "go.uber.org/zap" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/common" @@ -22,10 +20,10 @@ func (c Connection) Close() error { return c.db.Close() } -func (c Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) { - c.logger.Dump(query, args...) +func (c Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { + c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) - out, err := c.db.QueryContext(ctx, query, args...) + out, err := c.db.QueryContext(params.Ctx, params.QueryText, params.QueryArgs.Values()...) return rows{out}, err } diff --git a/app/server/datasource/rdbms/ms_sql_server/table_metadata_query.go b/app/server/datasource/rdbms/ms_sql_server/table_metadata_query.go index f6238c54..e007a6e7 100644 --- a/app/server/datasource/rdbms/ms_sql_server/table_metadata_query.go +++ b/app/server/datasource/rdbms/ms_sql_server/table_metadata_query.go @@ -4,12 +4,16 @@ import ( _ "github.com/denisenkom/go-mssqldb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" ) -func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) { +func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) { // opts := request.GetDataSourceInstance().GetPgOptions().GetSchema() query := "SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = @p1;" - args := []any{request.Table} // , opts} - return query, args + var args rdbms_utils.QueryArgs + + args.AddUntyped(request.Table) + + return query, &args } diff --git a/app/server/datasource/rdbms/mysql/connection.go b/app/server/datasource/rdbms/mysql/connection.go index 0d5434f0..99d1a4f1 100644 --- a/app/server/datasource/rdbms/mysql/connection.go +++ b/app/server/datasource/rdbms/mysql/connection.go @@ -1,13 +1,11 @@ package mysql import ( - "context" "fmt" "sync/atomic" "github.com/go-mysql-org/go-mysql/client" "github.com/go-mysql-org/go-mysql/mysql" - "go.uber.org/zap" "github.com/ydb-platform/fq-connector-go/app/config" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" @@ -26,16 +24,16 @@ func (c *Connection) Close() error { return c.conn.Close() } -func (c *Connection) Query(ctx context.Context, logger *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) { - c.logger.Dump(query, args...) +func (c *Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { + c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) results := make(chan rowData, c.cfg.ResultChanCapacity) result := &mysql.Result{} r := &rows{ - ctx: ctx, + ctx: params.Ctx, cfg: c.cfg, - logger: logger, + logger: params.Logger, rowChan: results, errChan: make(chan error, 1), lastRow: nil, @@ -44,7 +42,7 @@ func (c *Connection) Query(ctx context.Context, logger *zap.Logger, query string inputFinished: false, } - stmt, err := c.conn.Prepare(query) + stmt, err := c.conn.Prepare(params.QueryText) if err != nil { return r, fmt.Errorf("mysql: failed to prepare query: %w", err) } @@ -78,14 +76,14 @@ func (c *Connection) Query(ctx context.Context, logger *zap.Logger, query string select { case r.rowChan <- rowData{newRow, result.Fields}: - case <-ctx.Done(): - return ctx.Err() + case <-params.Ctx.Done(): + return params.Ctx.Err() } return nil }, nil, - args..., + params.QueryArgs.Values()..., ) }() diff --git a/app/server/datasource/rdbms/mysql/table_metadata_query.go b/app/server/datasource/rdbms/mysql/table_metadata_query.go index f0f1309b..a401b9de 100644 --- a/app/server/datasource/rdbms/mysql/table_metadata_query.go +++ b/app/server/datasource/rdbms/mysql/table_metadata_query.go @@ -2,16 +2,20 @@ package mysql import ( api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" ) -func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) { +func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) { // TODO: do not add 'unsigned' modifiers to column type and use the driver-provided fields instead. // In MySQL schema and database are basically the same thing. So we can safely pass dbname as // `schema_name` when quering `information_schema`. query := `SELECT column_name, column_type FROM information_schema.columns WHERE table_name = ? AND table_schema = ?` - args := []any{request.Table, request.GetDataSourceInstance().Database} + var args rdbms_utils.QueryArgs - return query, args + args.AddUntyped(request.Table) + args.AddUntyped(request.GetDataSourceInstance().Database) + + return query, &args } diff --git a/app/server/datasource/rdbms/oracle/connection.go b/app/server/datasource/rdbms/oracle/connection.go index 3ff1e5d0..f6f6dda1 100644 --- a/app/server/datasource/rdbms/oracle/connection.go +++ b/app/server/datasource/rdbms/oracle/connection.go @@ -1,12 +1,10 @@ package oracle import ( - "context" "database/sql/driver" "fmt" go_ora "github.com/sijms/go-ora/v2" - "go.uber.org/zap" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/common" @@ -23,12 +21,12 @@ func (c Connection) Close() error { return c.conn.Close() } -func (c Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) { - c.logger.Dump(query, args...) +func (c Connection) Query(queryParams *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { + c.logger.Dump(queryParams.QueryText, queryParams.QueryArgs.Values()...) - valueArgs := make([]driver.NamedValue, len(args)) - for i := 0; i < len(args); i++ { - valueArgs[i].Value = args[i] + valueArgs := make([]driver.NamedValue, queryParams.QueryArgs.Count()) + for i := 0; i < len(queryParams.QueryArgs.Values()); i++ { + valueArgs[i].Value = queryParams.QueryArgs.Get(i).Value // TODO YQ-3455: research // for some reason query works with all Ordinal = 0 // Golang docs states: Ordinal position of the parameter starting from one and is always set. @@ -37,7 +35,7 @@ func (c Connection) Query(ctx context.Context, _ *zap.Logger, query string, args valueArgs[i].Ordinal = i + 1 } - out, err := c.conn.QueryContext(ctx, query, valueArgs) + out, err := c.conn.QueryContext(queryParams.Ctx, queryParams.QueryText, valueArgs) if err != nil { return nil, fmt.Errorf("query with context: %w", err) } diff --git a/app/server/datasource/rdbms/oracle/table_metadata_query.go b/app/server/datasource/rdbms/oracle/table_metadata_query.go index 6037528d..f27503f2 100644 --- a/app/server/datasource/rdbms/oracle/table_metadata_query.go +++ b/app/server/datasource/rdbms/oracle/table_metadata_query.go @@ -2,13 +2,17 @@ package oracle import ( api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" ) -func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) { +func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) { // TODO YQ-3413: synonym tables and from other users. // TODO YQ-3454: all capitalize query := "SELECT column_name, data_type FROM user_tab_columns WHERE table_name = :1" - args := []any{request.Table} // , opts} - return query, args + var args rdbms_utils.QueryArgs + + args.AddUntyped(request.Table) + + return query, &args } diff --git a/app/server/datasource/rdbms/postgresql/connection_manager.go b/app/server/datasource/rdbms/postgresql/connection_manager.go index 2fc316d0..95d2a581 100644 --- a/app/server/datasource/rdbms/postgresql/connection_manager.go +++ b/app/server/datasource/rdbms/postgresql/connection_manager.go @@ -53,10 +53,10 @@ func (c Connection) Close() error { return c.Conn.Close(context.TODO()) } -func (c Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) { - c.logger.Dump(query, args...) +func (c Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { + c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) - out, err := c.Conn.Query(ctx, query, args...) + out, err := c.Conn.Query(params.Ctx, params.QueryText, params.QueryArgs.Values()...) if err != nil { return nil, fmt.Errorf("query error: %w", err) } diff --git a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go index 65fa7296..6386ff3d 100644 --- a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go +++ b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go @@ -1,6 +1,7 @@ package postgresql import ( + "context" "errors" "testing" @@ -451,14 +452,18 @@ func TestMakeReadSplitsQuery(t *testing.T) { t.Run(tc.testName, func(t *testing.T) { readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery( - logger, formatter, tc.selectReq, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL) + context.Background(), + logger, + formatter, + tc.selectReq, + api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL) if tc.err != nil { require.True(t, errors.Is(err, tc.err)) return } - require.Equal(t, tc.outputQuery, readSplitsQuery.Query) - require.Equal(t, tc.outputArgs, readSplitsQuery.Args) + require.Equal(t, tc.outputQuery, readSplitsQuery.QueryText) + require.Equal(t, tc.outputArgs, readSplitsQuery.QueryArgs.Values()) require.Equal(t, tc.outputSelectWhat, readSplitsQuery.What) }) } diff --git a/app/server/datasource/rdbms/postgresql/table_metadata_query.go b/app/server/datasource/rdbms/postgresql/table_metadata_query.go index a473f2f9..7842cef0 100644 --- a/app/server/datasource/rdbms/postgresql/table_metadata_query.go +++ b/app/server/datasource/rdbms/postgresql/table_metadata_query.go @@ -2,14 +2,19 @@ package postgresql import ( api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" ) func TableMetadataQuery( request *api_service_protos.TDescribeTableRequest, schema string, -) (string, []any) { +) (string, *rdbms_utils.QueryArgs) { query := "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1 AND table_schema = $2" - args := []any{request.Table, schema} - return query, args + var args rdbms_utils.QueryArgs + + args.AddUntyped(request.Table) + args.AddUntyped(schema) + + return query, &args } diff --git a/app/server/datasource/rdbms/utils/predicate_builder.go b/app/server/datasource/rdbms/utils/predicate_builder.go index 288acb0f..42d4122c 100644 --- a/app/server/datasource/rdbms/utils/predicate_builder.go +++ b/app/server/datasource/rdbms/utils/predicate_builder.go @@ -10,26 +10,26 @@ import ( "github.com/ydb-platform/fq-connector-go/common" ) -func formatValue(formatter SQLFormatter, args []any, value *Ydb.TypedValue) (string, []any, error) { +func formatValue(formatter SQLFormatter, args *QueryArgs, value *Ydb.TypedValue) (string, *QueryArgs, error) { switch v := value.Value.Value.(type) { case *Ydb.Value_BoolValue: - return formatter.GetPlaceholder(len(args)), append(args, v.BoolValue), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.BoolValue), nil case *Ydb.Value_Int32Value: - return formatter.GetPlaceholder(len(args)), append(args, v.Int32Value), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.Int32Value), nil case *Ydb.Value_Uint32Value: - return formatter.GetPlaceholder(len(args)), append(args, v.Uint32Value), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.Uint32Value), nil case *Ydb.Value_Int64Value: - return formatter.GetPlaceholder(len(args)), append(args, v.Int64Value), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.Int64Value), nil case *Ydb.Value_Uint64Value: - return formatter.GetPlaceholder(len(args)), append(args, v.Uint64Value), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.Uint64Value), nil case *Ydb.Value_FloatValue: - return formatter.GetPlaceholder(len(args)), append(args, v.FloatValue), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.FloatValue), nil case *Ydb.Value_DoubleValue: - return formatter.GetPlaceholder(len(args)), append(args, v.DoubleValue), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.DoubleValue), nil case *Ydb.Value_BytesValue: - return formatter.GetPlaceholder(len(args)), append(args, v.BytesValue), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.BytesValue), nil case *Ydb.Value_TextValue: - return formatter.GetPlaceholder(len(args)), append(args, v.TextValue), nil + return formatter.GetPlaceholder(args.Count()), args.AddTyped(value.Type, v.TextValue), nil case *Ydb.Value_NullFlagValue: placeholder, newArgs, err := formatNullFlagValue(formatter, args, value) if err != nil { @@ -42,7 +42,17 @@ func formatValue(formatter SQLFormatter, args []any, value *Ydb.TypedValue) (str } } -func formatNullFlagValue(formatter SQLFormatter, args []any, value *Ydb.TypedValue) (string, []any, error) { +func addTypedNull[ACCEPTOR_TYPE any]( + formatter SQLFormatter, + args *QueryArgs, + typeId Ydb.Type_PrimitiveTypeId, +) (string, *QueryArgs, error) { + return formatter.GetPlaceholder(args.Count()), + args.AddTyped(&Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: typeId}}, (*ACCEPTOR_TYPE)(nil)), + nil +} + +func formatNullFlagValue(formatter SQLFormatter, args *QueryArgs, value *Ydb.TypedValue) (string, *QueryArgs, error) { optType, ok := value.Type.GetType().(*Ydb.Type_OptionalType) if !ok { return "", args, fmt.Errorf( @@ -54,48 +64,48 @@ func formatNullFlagValue(formatter SQLFormatter, args []any, value *Ydb.TypedVal case *Ydb.Type_TypeId: switch innerType.TypeId { case Ydb.Type_BOOL: - return formatter.GetPlaceholder(len(args)), append(args, (*bool)(nil)), nil + return addTypedNull[bool](formatter, args, Ydb.Type_BOOL) case Ydb.Type_INT8: - return formatter.GetPlaceholder(len(args)), append(args, (*int8)(nil)), nil + return addTypedNull[int8](formatter, args, Ydb.Type_INT8) case Ydb.Type_UINT8: - return formatter.GetPlaceholder(len(args)), append(args, (*uint8)(nil)), nil + return addTypedNull[uint8](formatter, args, Ydb.Type_UINT8) case Ydb.Type_INT16: - return formatter.GetPlaceholder(len(args)), append(args, (*int16)(nil)), nil + return addTypedNull[int16](formatter, args, Ydb.Type_INT16) case Ydb.Type_UINT16: - return formatter.GetPlaceholder(len(args)), append(args, (*uint16)(nil)), nil + return addTypedNull[uint16](formatter, args, Ydb.Type_UINT16) case Ydb.Type_INT32: - return formatter.GetPlaceholder(len(args)), append(args, (*int32)(nil)), nil + return addTypedNull[int32](formatter, args, Ydb.Type_INT32) case Ydb.Type_UINT32: - return formatter.GetPlaceholder(len(args)), append(args, (*uint32)(nil)), nil + return addTypedNull[uint32](formatter, args, Ydb.Type_UINT32) case Ydb.Type_INT64: - return formatter.GetPlaceholder(len(args)), append(args, (*int64)(nil)), nil + return addTypedNull[int64](formatter, args, Ydb.Type_INT64) case Ydb.Type_UINT64: - return formatter.GetPlaceholder(len(args)), append(args, (*uint64)(nil)), nil + return addTypedNull[uint64](formatter, args, Ydb.Type_UINT64) case Ydb.Type_STRING: - return formatter.GetPlaceholder(len(args)), append(args, (*[]byte)(nil)), nil + return addTypedNull[[]byte](formatter, args, Ydb.Type_STRING) case Ydb.Type_UTF8: - return formatter.GetPlaceholder(len(args)), append(args, (*string)(nil)), nil + return addTypedNull[string](formatter, args, Ydb.Type_UTF8) default: return "", args, fmt.Errorf("unsupported primitive type '%v' instead: %w", innerType, common.ErrUnimplementedTypedValue) } default: - return "", args, fmt.Errorf("unsupported type '%T' instead: %w", innerType, common.ErrUnimplementedTypedValue) + return "", args, fmt.Errorf("unsupported type '%v' instead: %w", innerType, common.ErrUnimplementedTypedValue) } } -func formatColumn(formatter SQLFormatter, args []any, col string) (string, []any, error) { +func formatColumn(formatter SQLFormatter, args *QueryArgs, col string) (string, *QueryArgs, error) { return formatter.SanitiseIdentifier(col), args, nil } -func formatNull(_ SQLFormatter, args []any, _ *api_service_protos.TExpression_TNull) (string, []any, error) { +func formatNull(_ SQLFormatter, args *QueryArgs, _ *api_service_protos.TExpression_TNull) (string, *QueryArgs, error) { return "NULL", args, nil } func formatArithmeticalExpression( formatter SQLFormatter, - args []any, + args *QueryArgs, expression *api_service_protos.TExpression_TArithmeticalExpression, -) (string, []any, error) { +) (string, *QueryArgs, error) { var operation string switch op := expression.Operation; op { @@ -128,7 +138,7 @@ func formatArithmeticalExpression( return fmt.Sprintf("(%s%s%s)", left, operation, right), args, nil } -func formatExpression(formatter SQLFormatter, args []any, expression *api_service_protos.TExpression) (string, []any, error) { +func formatExpression(formatter SQLFormatter, args *QueryArgs, expression *api_service_protos.TExpression) (string, *QueryArgs, error) { if !formatter.SupportsPushdownExpression(expression) { return "", args, common.ErrUnsupportedExpression } @@ -147,7 +157,11 @@ func formatExpression(formatter SQLFormatter, args []any, expression *api_servic } } -func formatComparison(formatter SQLFormatter, args []any, comparison *api_service_protos.TPredicate_TComparison) (string, []any, error) { +func formatComparison( + formatter SQLFormatter, + args *QueryArgs, + comparison *api_service_protos.TPredicate_TComparison, +) (string, *QueryArgs, error) { var operation string switch op := comparison.Operation; op { @@ -180,7 +194,10 @@ func formatComparison(formatter SQLFormatter, args []any, comparison *api_servic return fmt.Sprintf("(%s%s%s)", left, operation, right), args, nil } -func formatNegation(formatter SQLFormatter, args []any, negation *api_service_protos.TPredicate_TNegation) (string, []any, error) { +func formatNegation( + formatter SQLFormatter, + args *QueryArgs, + negation *api_service_protos.TPredicate_TNegation) (string, *QueryArgs, error) { pred, args, err := formatPredicate(formatter, args, negation.Operand, false) if err != nil { return "", args, fmt.Errorf("failed to format NOT statement: %w", err) @@ -191,10 +208,10 @@ func formatNegation(formatter SQLFormatter, args []any, negation *api_service_pr func formatConjunction( formatter SQLFormatter, - args []any, + args *QueryArgs, conjunction *api_service_protos.TPredicate_TConjunction, topLevel bool, -) (string, []any, error) { +) (string, *QueryArgs, error) { var ( sb strings.Builder succeeded int32 @@ -243,7 +260,11 @@ func formatConjunction( return sb.String(), args, nil } -func formatDisjunction(formatter SQLFormatter, args []any, disjunction *api_service_protos.TPredicate_TDisjunction) (string, []any, error) { +func formatDisjunction( + formatter SQLFormatter, + args *QueryArgs, + disjunction *api_service_protos.TPredicate_TDisjunction, +) (string, *QueryArgs, error) { var ( sb strings.Builder cnt int32 @@ -286,7 +307,11 @@ func formatDisjunction(formatter SQLFormatter, args []any, disjunction *api_serv return sb.String(), args, nil } -func formatIsNull(formatter SQLFormatter, args []any, isNull *api_service_protos.TPredicate_TIsNull) (string, []any, error) { +func formatIsNull( + formatter SQLFormatter, + args *QueryArgs, + isNull *api_service_protos.TPredicate_TIsNull, +) (string, *QueryArgs, error) { statement, args, err := formatExpression(formatter, args, isNull.Value) if err != nil { return "", args, fmt.Errorf("failed to format IS NULL statement: %w", err) @@ -295,7 +320,11 @@ func formatIsNull(formatter SQLFormatter, args []any, isNull *api_service_protos return fmt.Sprintf("(%s IS NULL)", statement), args, nil } -func formatIsNotNull(formatter SQLFormatter, args []any, isNotNull *api_service_protos.TPredicate_TIsNotNull) (string, []any, error) { +func formatIsNotNull( + formatter SQLFormatter, + args *QueryArgs, + isNotNull *api_service_protos.TPredicate_TIsNotNull, +) (string, *QueryArgs, error) { statement, args, err := formatExpression(formatter, args, isNotNull.Value) if err != nil { return "", args, fmt.Errorf("failed to format IS NOT NULL statement: %w", err) @@ -304,7 +333,12 @@ func formatIsNotNull(formatter SQLFormatter, args []any, isNotNull *api_service_ return fmt.Sprintf("(%s IS NOT NULL)", statement), args, nil } -func formatPredicate(formatter SQLFormatter, args []any, predicate *api_service_protos.TPredicate, topLevel bool) (string, []any, error) { +func formatPredicate( + formatter SQLFormatter, + args *QueryArgs, + predicate *api_service_protos.TPredicate, + topLevel bool, +) (string, *QueryArgs, error) { switch p := predicate.Payload.(type) { case *api_service_protos.TPredicate_Negation: return formatNegation(formatter, args, p.Negation) @@ -325,12 +359,12 @@ func formatPredicate(formatter SQLFormatter, args []any, predicate *api_service_ } } -func formatWhereClause(formatter SQLFormatter, where *api_service_protos.TSelect_TWhere) (string, []any, error) { +func formatWhereClause(formatter SQLFormatter, where *api_service_protos.TSelect_TWhere) (string, *QueryArgs, error) { if where.FilterTyped == nil { - return "", nil, common.ErrUnimplemented + return "", nil, fmt.Errorf("unexpected nil filter: %w", common.ErrInvalidRequest) } - args := make([]any, 0) + args := &QueryArgs{} formatted, args, err := formatPredicate(formatter, args, where.FilterTyped, true) if err != nil { diff --git a/app/server/datasource/rdbms/utils/query_args_collector.go b/app/server/datasource/rdbms/utils/query_args_collector.go new file mode 100644 index 00000000..30fb3ef4 --- /dev/null +++ b/app/server/datasource/rdbms/utils/query_args_collector.go @@ -0,0 +1,51 @@ +package utils + +import "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + +type QueryArg struct { + YdbType *Ydb.Type + Value any +} + +type QueryArgs struct { + args []*QueryArg +} + +func (q *QueryArgs) AddTyped(ydbType *Ydb.Type, arg any) *QueryArgs { + q.args = append(q.args, &QueryArg{ydbType, arg}) + + return q +} + +func (q *QueryArgs) AddUntyped(arg any) *QueryArgs { return q.AddTyped(nil, arg) } + +func (q *QueryArgs) Count() int { + if q == nil { + return 0 + } + + return len(q.args) +} + +func (q *QueryArgs) Values() []any { + if q == nil { + return []any{} + } + + args := make([]any, len(q.args)) + for i, arg := range q.args { + args[i] = arg.Value + } + + return args +} + +func (q *QueryArgs) Get(i int) *QueryArg { return q.args[i] } + +func (q *QueryArgs) GetAll() []*QueryArg { + if q == nil { + return nil + } + + return q.args +} diff --git a/app/server/datasource/rdbms/utils/query_builder.go b/app/server/datasource/rdbms/utils/query_builder.go index fadc090e..cb974f15 100644 --- a/app/server/datasource/rdbms/utils/query_builder.go +++ b/app/server/datasource/rdbms/utils/query_builder.go @@ -1,6 +1,7 @@ package utils import ( + "context" "fmt" "strings" @@ -10,33 +11,33 @@ import ( ) type ReadSplitsQuery struct { - Query string - Args []any - What *api_service_protos.TSelect_TWhat + QueryParams + What *api_service_protos.TSelect_TWhat } func MakeReadSplitsQuery( + ctx context.Context, logger *zap.Logger, formatter SQLFormatter, slct *api_service_protos.TSelect, filtering api_service_protos.TReadSplitsRequest_EFiltering, ) (*ReadSplitsQuery, error) { - var ( - sb strings.Builder - args []any - ) - selectPart, newSelectWhat, err := formatSelectHead(formatter, slct.GetWhat(), slct.GetFrom().GetTable(), true) if err != nil { return nil, fmt.Errorf("failed to format select statement: %w", err) } + var ( + sb strings.Builder + queryArgs *QueryArgs + ) + sb.WriteString(selectPart) if slct.Where != nil { var clause string - clause, args, err = formatWhereClause(formatter, slct.Where) + clause, queryArgs, err = formatWhereClause(formatter, slct.Where) if err != nil { switch filtering { case api_service_protos.TReadSplitsRequest_FILTERING_UNSPECIFIED, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL: @@ -56,15 +57,15 @@ func MakeReadSplitsQuery( } } - query := sb.String() - - if args == nil { - args = []any{} - } + queryText := sb.String() return &ReadSplitsQuery{ - Query: query, - Args: args, - What: newSelectWhat, + QueryParams: QueryParams{ + Ctx: ctx, + Logger: logger, + QueryText: queryText, + QueryArgs: queryArgs, + }, + What: newSelectWhat, }, nil } diff --git a/app/server/datasource/rdbms/utils/schema_provider.go b/app/server/datasource/rdbms/utils/schema_provider.go index 1ec2b0a3..5b2b1208 100644 --- a/app/server/datasource/rdbms/utils/schema_provider.go +++ b/app/server/datasource/rdbms/utils/schema_provider.go @@ -13,7 +13,7 @@ import ( type DefaultSchemaProvider struct { typeMapper datasource.TypeMapper - getArgsAndQuery func(request *api_service_protos.TDescribeTableRequest) (string, []any) + getArgsAndQuery func(request *api_service_protos.TDescribeTableRequest) (string, *QueryArgs) } var _ SchemaProvider = (*DefaultSchemaProvider)(nil) @@ -26,7 +26,14 @@ func (f *DefaultSchemaProvider) GetSchema( ) (*api_service_protos.TSchema, error) { query, args := f.getArgsAndQuery(request) - rows, err := conn.Query(ctx, logger, query, args...) + queryParams := &QueryParams{ + Ctx: ctx, + Logger: logger, + QueryText: query, + QueryArgs: args, + } + + rows, err := conn.Query(queryParams) if err != nil { return nil, fmt.Errorf("query builder error: %w", err) } @@ -64,7 +71,7 @@ func (f *DefaultSchemaProvider) GetSchema( func NewDefaultSchemaProvider( typeMapper datasource.TypeMapper, - getArgsAndQueryFunc func(request *api_service_protos.TDescribeTableRequest) (string, []any), + getArgsAndQueryFunc func(request *api_service_protos.TDescribeTableRequest) (string, *QueryArgs), ) SchemaProvider { return &DefaultSchemaProvider{ typeMapper: typeMapper, diff --git a/app/server/datasource/rdbms/utils/sql.go b/app/server/datasource/rdbms/utils/sql.go index 89c089bf..c0f418e8 100644 --- a/app/server/datasource/rdbms/utils/sql.go +++ b/app/server/datasource/rdbms/utils/sql.go @@ -13,8 +13,15 @@ import ( "github.com/ydb-platform/fq-connector-go/common" ) +type QueryParams struct { + Ctx context.Context + Logger *zap.Logger + QueryText string + QueryArgs *QueryArgs +} + type Connection interface { - Query(ctx context.Context, logger *zap.Logger, query string, args ...any) (Rows, error) + Query(params *QueryParams) (Rows, error) Close() error } diff --git a/app/server/datasource/rdbms/utils/sql_mock.go b/app/server/datasource/rdbms/utils/sql_mock.go index 51c20b12..9b31db03 100644 --- a/app/server/datasource/rdbms/utils/sql_mock.go +++ b/app/server/datasource/rdbms/utils/sql_mock.go @@ -19,9 +19,9 @@ type ConnectionMock struct { mock.Mock } -func (m *ConnectionMock) Query(_ context.Context, _ *zap.Logger, query string, params ...any) (Rows, error) { - called := []any{query} - called = append(called, params...) +func (m *ConnectionMock) Query(params *QueryParams) (Rows, error) { + called := []any{params.QueryText} + called = append(called, params.QueryArgs.Values()...) args := m.Called(called...) return args.Get(0).(Rows), args.Error(1) diff --git a/app/server/datasource/rdbms/ydb/connection_database_sql.go b/app/server/datasource/rdbms/ydb/connection_database_sql.go new file mode 100644 index 00000000..891e3160 --- /dev/null +++ b/app/server/datasource/rdbms/ydb/connection_database_sql.go @@ -0,0 +1,127 @@ +package ydb + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + ydb_sdk "github.com/ydb-platform/ydb-go-sdk/v3" + "go.uber.org/zap" + + api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" + rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" + "github.com/ydb-platform/fq-connector-go/app/server/paging" + "github.com/ydb-platform/fq-connector-go/common" +) + +type rowsDatabaseSQL struct { + *sql.Rows +} + +func (r rowsDatabaseSQL) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { + columns, err := r.ColumnTypes() + if err != nil { + return nil, fmt.Errorf("column types: %w", err) + } + + typeNames := make([]string, 0, len(columns)) + for _, column := range columns { + typeNames = append(typeNames, column.DatabaseTypeName()) + } + + transformer, err := transformerFromSQLTypes(typeNames, ydbTypes, cc) + if err != nil { + return nil, fmt.Errorf("transformer from sql types: %w", err) + } + + return transformer, nil +} + +var _ rdbms_utils.Connection = (*connectionDatabaseSQL)(nil) + +type connectionDatabaseSQL struct { + *sql.DB + driver *ydb_sdk.Driver + logger common.QueryLogger +} + +func (c *connectionDatabaseSQL) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { + c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) + + out, err := c.DB.QueryContext( + ydb_sdk.WithQueryMode(params.Ctx, ydb_sdk.ScanQueryMode), + params.QueryText, + params.QueryArgs.Values()...) + if err != nil { + return nil, fmt.Errorf("query context: %w", err) + } + + if err := out.Err(); err != nil { + defer func() { + if err = out.Close(); err != nil { + c.logger.Error("close rows", zap.Error(err)) + } + }() + + return nil, fmt.Errorf("rows err: %w", err) + } + + return rowsDatabaseSQL{Rows: out}, nil +} + +func (c *connectionDatabaseSQL) getDriver() *ydb_sdk.Driver { + return c.driver +} + +func (c *connectionDatabaseSQL) Close() error { + err1 := c.DB.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err2 := c.driver.Close(ctx) + + if err1 != nil || err2 != nil { + return fmt.Errorf("connection close err: %w; driver close err: %w", err1, err2) + } + + return nil +} + +func newConnectionDatabaseSQL( + ctx context.Context, + logger *zap.Logger, + queryLogger common.QueryLogger, + cfg *config.TYdbConfig, + dsi *api_common.TDataSourceInstance, + ydbDriver *ydb_sdk.Driver, +) (ydbConnection, error) { + ydbConn, err := ydb_sdk.Connector( + ydbDriver, + ydb_sdk.WithAutoDeclare(), + ydb_sdk.WithPositionalArgs(), + ydb_sdk.WithTablePathPrefix(dsi.Database), + ) + + if err != nil { + return nil, fmt.Errorf("connector error: %w", err) + } + + conn := sql.OpenDB(ydbConn) + + logger.Debug("Pinging database") + + pingCtx, pingCtxCancel := context.WithTimeout(ctx, common.MustDurationFromString(cfg.PingConnectionTimeout)) + defer pingCtxCancel() + + if err := conn.PingContext(pingCtx); err != nil { + common.LogCloserError(logger, conn, "close YDB connection") + return nil, fmt.Errorf("conn ping: %w", err) + } + + return &connectionDatabaseSQL{DB: conn, driver: ydbDriver, logger: queryLogger}, nil +} diff --git a/app/server/datasource/rdbms/ydb/connection_manager.go b/app/server/datasource/rdbms/ydb/connection_manager.go index 746eeeed..88cc5665 100644 --- a/app/server/datasource/rdbms/ydb/connection_manager.go +++ b/app/server/datasource/rdbms/ydb/connection_manager.go @@ -2,12 +2,9 @@ package ydb import ( "context" - "database/sql" "fmt" "strings" - "time" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" ydb_sdk "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/balancers" ydb_sdk_config "github.com/ydb-platform/ydb-go-sdk/v3/config" @@ -17,77 +14,13 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" "github.com/ydb-platform/fq-connector-go/app/config" - "github.com/ydb-platform/fq-connector-go/app/server/conversion" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" - "github.com/ydb-platform/fq-connector-go/app/server/paging" "github.com/ydb-platform/fq-connector-go/common" ) -var _ rdbms_utils.Connection = (*Connection)(nil) - -type Connection struct { - *sql.DB - driver *ydb_sdk.Driver - logger common.QueryLogger -} - -type rows struct { - *sql.Rows -} - -func (r rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { - columns, err := r.ColumnTypes() - if err != nil { - return nil, fmt.Errorf("column types: %w", err) - } - - typeNames := make([]string, 0, len(columns)) - for _, column := range columns { - typeNames = append(typeNames, column.DatabaseTypeName()) - } - - transformer, err := transformerFromSQLTypes(typeNames, ydbTypes, cc) - if err != nil { - return nil, fmt.Errorf("transformer from sql types: %w", err) - } - - return transformer, nil -} - -func (c *Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) { - c.logger.Dump(query, args...) - - out, err := c.DB.QueryContext(ydb_sdk.WithQueryMode(ctx, ydb_sdk.ScanQueryMode), query, args...) - if err != nil { - return nil, fmt.Errorf("query context: %w", err) - } - - if err := out.Err(); err != nil { - defer func() { - if err = out.Close(); err != nil { - c.logger.Error("close rows", zap.Error(err)) - } - }() - - return nil, fmt.Errorf("rows err: %w", err) - } - - return rows{Rows: out}, nil -} - -func (c *Connection) Close() error { - err1 := c.DB.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - err2 := c.driver.Close(ctx) - - if err1 != nil || err2 != nil { - return fmt.Errorf("connection close err: %w; driver close err: %w", err1, err2) - } - - return nil +type ydbConnection interface { + rdbms_utils.Connection + getDriver() *ydb_sdk.Driver } var _ rdbms_utils.ConnectionManager = (*connectionManager)(nil) @@ -145,33 +78,26 @@ func (c *connectionManager) Make( return nil, fmt.Errorf("open driver error: %w", err) } - ydbConn, err := ydb_sdk.Connector( - ydbDriver, - ydb_sdk.WithAutoDeclare(), - ydb_sdk.WithPositionalArgs(), - ydb_sdk.WithTablePathPrefix(dsi.Database), - ) - if err != nil { - return nil, fmt.Errorf("connector error: %w", err) + var ydbConn ydbConnection + + switch c.cfg.Mode { + case config.TYdbConfig_MODE_UNSPECIFIED: + fallthrough + case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE: + ydbConn = newConnectionNative(ctx, c.QueryLoggerFactory.Make(logger), dsi, ydbDriver) + case config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES: + ydbConn, err = newConnectionDatabaseSQL(ctx, logger, c.QueryLoggerFactory.Make(logger), c.cfg, dsi, ydbDriver) + default: + return nil, fmt.Errorf("unknown mode: %v", c.cfg.Mode) } - conn := sql.OpenDB(ydbConn) - - logger.Debug("Pinging database") - - pingCtx, pingCtxCancel := context.WithTimeout(ctx, common.MustDurationFromString(c.cfg.PingConnectionTimeout)) - defer pingCtxCancel() - - if err := conn.PingContext(pingCtx); err != nil { - common.LogCloserError(logger, conn, "close YDB connection") - return nil, fmt.Errorf("conn ping: %w", err) + if err != nil { + return nil, fmt.Errorf("new connection: %w", err) } logger.Debug("Connection is ready") - queryLogger := c.QueryLoggerFactory.Make(logger) - - return &Connection{DB: conn, driver: ydbDriver, logger: queryLogger}, nil + return ydbConn, nil } func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) { diff --git a/app/server/datasource/rdbms/ydb/connection_native.go b/app/server/datasource/rdbms/ydb/connection_native.go new file mode 100644 index 00000000..48e784fa --- /dev/null +++ b/app/server/datasource/rdbms/ydb/connection_native.go @@ -0,0 +1,262 @@ +package ydb + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + ydb_sdk "github.com/ydb-platform/ydb-go-sdk/v3" + ydb_sdk_query "github.com/ydb-platform/ydb-go-sdk/v3/query" + "go.uber.org/zap" + + api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/app/server/conversion" + rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" + "github.com/ydb-platform/fq-connector-go/app/server/paging" + "github.com/ydb-platform/fq-connector-go/common" +) + +var _ rdbms_utils.Rows = (*rowsNative)(nil) + +type rowsNative struct { + ctx context.Context + err error + + streamResult ydb_sdk_query.Result + lastResultSet ydb_sdk_query.ResultSet + lastRow ydb_sdk_query.Row +} + +func (r *rowsNative) Next() bool { + var err error + + r.lastRow, err = r.lastResultSet.NextRow(r.ctx) + + if err != nil { + if errors.Is(err, io.EOF) { + r.err = nil + } else { + r.err = fmt.Errorf("next row: %w", err) + } + + return false + } + + return true +} + +func (r *rowsNative) NextResultSet() bool { + var err error + + r.lastResultSet, err = r.streamResult.NextResultSet(r.ctx) + if err != nil { + if errors.Is(err, io.EOF) { + r.err = nil + } else { + r.err = fmt.Errorf("next result set: %w", err) + } + + return false + } + + return true +} + +func (r *rowsNative) Scan(dest ...any) error { + if err := r.lastRow.Scan(dest...); err != nil { + return fmt.Errorf("rows scan: %w", err) + } + + return nil +} + +func (r *rowsNative) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { + if r.lastResultSet == nil { + return nil, fmt.Errorf("last result set is not ready yet") + } + + columnTypes := r.lastResultSet.ColumnTypes() + typeNames := make([]string, 0, len(columnTypes)) + + for _, columnType := range columnTypes { + typeNames = append(typeNames, columnType.Yql()) + } + + transformer, err := transformerFromSQLTypes(typeNames, ydbTypes, cc) + if err != nil { + return nil, fmt.Errorf("transformer from sql types: %w", err) + } + + return transformer, nil +} + +func (r *rowsNative) Err() error { + return r.err +} + +func (r *rowsNative) Close() error { + if err := r.streamResult.Close(r.ctx); err != nil { + return fmt.Errorf("stream result close: %w", err) + } + + return nil +} + +var _ rdbms_utils.Connection = (*connectionNative)(nil) + +type connectionNative struct { + dsi *api_common.TDataSourceInstance + queryLogger common.QueryLogger + ctx context.Context + driver *ydb_sdk.Driver +} + +// nolint: gocyclo +func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { + rowsChan := make(chan rdbms_utils.Rows, 1) + + finalErr := c.driver.Query().Do( + params.Ctx, + func(ctx context.Context, session ydb_sdk_query.Session) (err error) { + // modify query with args + queryRewritten, err := c.rewriteQuery(params) + if err != nil { + return fmt.Errorf("rewrite query: %w", err) + } + + // prepare parameter list + formatter := NewSQLFormatter(config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE) + paramsBuilder := ydb_sdk.ParamsBuilder() + for i, arg := range params.QueryArgs.Values() { + placeholder := formatter.GetPlaceholder(i) + + switch t := arg.(type) { + case int8: + paramsBuilder = paramsBuilder.Param(placeholder).Int8(t) + case int16: + paramsBuilder = paramsBuilder.Param(placeholder).Int16(t) + case int32: + paramsBuilder = paramsBuilder.Param(placeholder).Int32(t) + case int64: + paramsBuilder = paramsBuilder.Param(placeholder).Int64(t) + case uint8: + paramsBuilder = paramsBuilder.Param(placeholder).Uint8(t) + case uint16: + paramsBuilder = paramsBuilder.Param(placeholder).Uint16(t) + case uint32: + paramsBuilder = paramsBuilder.Param(placeholder).Uint32(t) + case uint64: + paramsBuilder = paramsBuilder.Param(placeholder).Uint64(t) + case float32: + paramsBuilder = paramsBuilder.Param(placeholder).Float(t) + case float64: + paramsBuilder = paramsBuilder.Param(placeholder).Double(t) + case string: + paramsBuilder = paramsBuilder.Param(placeholder).Text(t) + case []byte: + paramsBuilder = paramsBuilder.Param(placeholder).Bytes(t) + default: + return fmt.Errorf("unsupported type: %v (%T): %w", arg, arg, common.ErrUnimplementedPredicateType) + } + } + c.queryLogger.Dump(queryRewritten, params.QueryArgs.Values()...) + + // execute query + streamResult, err := session.Query( + ctx, + queryRewritten, + ydb_sdk_query.WithParameters(paramsBuilder.Build())) + if err != nil { + return fmt.Errorf("session query: %w", err) + } + + // obtain first result set because it's necessary + // to create type transformers + resultSet, err := streamResult.NextResultSet(ctx) + if err != nil { + if closeErr := streamResult.Close(ctx); closeErr != nil { + params.Logger.Error("close stream result", zap.Error(closeErr)) + } + + return fmt.Errorf("next result set: %w", err) + } + + rows := &rowsNative{ + ctx: c.ctx, + streamResult: streamResult, + lastResultSet: resultSet, + } + + select { + case rowsChan <- rows: + return nil + case <-ctx.Done(): + if closeErr := streamResult.Close(ctx); closeErr != nil { + params.Logger.Error("close stream result", zap.Error(closeErr)) + } + return ctx.Err() + } + }, + ydb_sdk_query.WithIdempotent(), + ) + + if finalErr != nil { + return nil, fmt.Errorf("query do: %w", finalErr) + } + + select { + case rows := <-rowsChan: + return rows, nil + case <-params.Ctx.Done(): + return nil, params.Ctx.Err() + } +} + +func (c *connectionNative) getDriver() *ydb_sdk.Driver { + return c.driver +} + +func (c *connectionNative) Close() error { + if err := c.driver.Close(c.ctx); err != nil { + return fmt.Errorf("driver close: %w", err) + } + + return nil +} + +func newConnectionNative( + ctx context.Context, + queryLogger common.QueryLogger, + dsi *api_common.TDataSourceInstance, + driver *ydb_sdk.Driver, +) ydbConnection { + return &connectionNative{ + ctx: ctx, + driver: driver, + queryLogger: queryLogger, + dsi: dsi, + } +} + +func (c *connectionNative) rewriteQuery(params *rdbms_utils.QueryParams) (string, error) { + var buf bytes.Buffer + + buf.WriteString(fmt.Sprintf("PRAGMA TablePathPrefix(\"%s\");", c.dsi.Database)) //nolint:revive + + for i, arg := range params.QueryArgs.GetAll() { + typeName, err := primitiveYqlTypeName(arg.YdbType.GetTypeId()) + if err != nil { + return "", fmt.Errorf("get YQL type name from value %v: %w", arg, err) + } + + buf.WriteString(fmt.Sprintf("DECLARE $p%d AS %s;", i, typeName)) //nolint:revive + } + + buf.WriteString(params.QueryText) //nolint:revive + + return buf.String(), nil +} diff --git a/app/server/datasource/rdbms/ydb/schema_provider.go b/app/server/datasource/rdbms/ydb/schema_provider.go index f3ffd215..ef71d108 100644 --- a/app/server/datasource/rdbms/ydb/schema_provider.go +++ b/app/server/datasource/rdbms/ydb/schema_provider.go @@ -5,7 +5,6 @@ import ( "fmt" "path" - "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" "go.uber.org/zap" @@ -27,29 +26,25 @@ func (f *schemaProvider) GetSchema( conn rdbms_utils.Connection, request *api_service_protos.TDescribeTableRequest, ) (*api_service_protos.TSchema, error) { - ydbConn := conn.(*Connection) - - db, err := ydb.Unwrap(ydbConn.DB) - if err != nil { - return nil, fmt.Errorf("unwrap connection: %w", err) - } + db := conn.(ydbConnection).getDriver() desc := options.Description{} prefix := path.Join(db.Name(), request.Table) - tableClient := db.Table() logger.Debug("obtaining table metadata", zap.String("prefix", prefix)) - err = tableClient.Do( + err := db.Table().Do( ctx, func(ctx context.Context, s table.Session) error { - desc, err = s.DescribeTable(ctx, prefix) - if err != nil { - return fmt.Errorf("describe table: %w", err) + var errInner error + desc, errInner = s.DescribeTable(ctx, prefix) + if errInner != nil { + return fmt.Errorf("describe table: %w", errInner) } return nil }, + table.WithIdempotent(), ) if err != nil { return nil, fmt.Errorf("get table description: %w", err) diff --git a/app/server/datasource/rdbms/ydb/sql_formatter.go b/app/server/datasource/rdbms/ydb/sql_formatter.go index f6d3c09d..8abe1d52 100644 --- a/app/server/datasource/rdbms/ydb/sql_formatter.go +++ b/app/server/datasource/rdbms/ydb/sql_formatter.go @@ -7,12 +7,14 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/config" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" ) var _ rdbms_utils.SQLFormatter = (*sqlFormatter)(nil) type sqlFormatter struct { + mode config.TYdbConfig_Mode } func (sqlFormatter) supportsType(typeID Ydb.Type_PrimitiveTypeId) bool { @@ -79,8 +81,15 @@ func (f sqlFormatter) SupportsPushdownExpression(expression *api_service_protos. } } -func (sqlFormatter) GetPlaceholder(_ int) string { - return "?" +func (f sqlFormatter) GetPlaceholder(id int) string { + switch f.mode { + case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE: + return fmt.Sprintf("$p%d", id) + case config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES: + return "?" + default: + panic("unknown mode") + } } // TODO: add identifiers processing @@ -96,6 +105,8 @@ func (f sqlFormatter) FormatFrom(tableName string) string { return f.SanitiseIdentifier(tableName) } -func NewSQLFormatter() rdbms_utils.SQLFormatter { - return sqlFormatter{} +func NewSQLFormatter(mode config.TYdbConfig_Mode) rdbms_utils.SQLFormatter { + return sqlFormatter{ + mode: mode, + } } diff --git a/app/server/datasource/rdbms/ydb/sql_formatter_test.go b/app/server/datasource/rdbms/ydb/sql_formatter_test.go index 552cc62b..918c3302 100644 --- a/app/server/datasource/rdbms/ydb/sql_formatter_test.go +++ b/app/server/datasource/rdbms/ydb/sql_formatter_test.go @@ -1,6 +1,7 @@ package ydb import ( + "context" "errors" "testing" @@ -8,6 +9,7 @@ import ( ydb "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/config" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/common" ) @@ -23,7 +25,7 @@ func TestMakeReadSplitsQuery(t *testing.T) { } logger := common.NewTestLogger(t) - formatter := NewSQLFormatter() + formatter := NewSQLFormatter(config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES) tcs := []testCase{ { @@ -454,15 +456,20 @@ func TestMakeReadSplitsQuery(t *testing.T) { t.Run(tc.testName, func(t *testing.T) { readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery( - logger, formatter, tc.selectReq, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL) + context.Background(), + logger, + formatter, + tc.selectReq, + api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL, + ) if tc.err != nil { require.True(t, errors.Is(err, tc.err)) return } require.NoError(t, err) - require.Equal(t, tc.outputQuery, readSplitsQuery.Query) - require.Equal(t, tc.outputArgs, readSplitsQuery.Args) + require.Equal(t, tc.outputQuery, readSplitsQuery.QueryText) + require.Equal(t, tc.outputArgs, readSplitsQuery.QueryArgs.Values()) require.Equal(t, tc.outputSelectWhat, readSplitsQuery.What) }) } diff --git a/app/server/datasource/rdbms/ydb/type_mapper.go b/app/server/datasource/rdbms/ydb/type_mapper.go index 1a77e304..795ec04a 100644 --- a/app/server/datasource/rdbms/ydb/type_mapper.go +++ b/app/server/datasource/rdbms/ydb/type_mapper.go @@ -44,6 +44,39 @@ const ( typeTimestamp = "Timestamp" ) +func primitiveYqlTypeName(typeId Ydb.Type_PrimitiveTypeId) (string, error) { + switch typeId { + case Ydb.Type_BOOL: + return typeBool, nil + case Ydb.Type_INT8: + return typeInt8, nil + case Ydb.Type_UINT8: + return typeUint8, nil + case Ydb.Type_INT16: + return typeInt16, nil + case Ydb.Type_UINT16: + return typeUint16, nil + case Ydb.Type_INT32: + return typeInt32, nil + case Ydb.Type_UINT32: + return typeUint32, nil + case Ydb.Type_INT64: + return typeInt64, nil + case Ydb.Type_UINT64: + return typeUint64, nil + case Ydb.Type_FLOAT: + return typeFloat, nil + case Ydb.Type_DOUBLE: + return typeDouble, nil + case Ydb.Type_STRING: + return typeString, nil + case Ydb.Type_UTF8: + return typeUtf8, nil + default: + return "", fmt.Errorf("unexpected primitive type id: %v", typeId) + } +} + func (typeMapper) SQLTypeToYDBColumn(columnName, typeName string, _rules *api_service_protos.TTypeMappingSettings) (*Ydb.Column, error) { var ( ydbType *Ydb.Type @@ -113,7 +146,7 @@ func makePrimitiveTypeFromString(typeName string) (*Ydb.Type, error) { } } -func appendToBuilderWithValuePtrConverter[ +func appendToBuilderSinglePtr[ IN common.ValueType, OUT common.ValueType, AB common.ArrowBuilder[OUT], @@ -121,14 +154,7 @@ func appendToBuilderWithValuePtrConverter[ conv conversion.ValuePtrConverter[IN, OUT], ) func(acceptor any, builder array.Builder) error { return func(acceptor any, builder array.Builder) error { - doublePtr := acceptor.(**IN) - - ptr := *doublePtr - if ptr == nil { - builder.AppendNull() - - return nil - } + ptr := acceptor.(*IN) out, err := conv.Convert(ptr) if err != nil { @@ -149,13 +175,37 @@ func appendToBuilderWithValuePtrConverter[ } } +func appendToBuilderDoublePtr[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], +]( + conv conversion.ValuePtrConverter[IN, OUT], +) func(acceptor any, builder array.Builder) error { + return func(acceptor any, builder array.Builder) error { + doublePtr := acceptor.(**IN) + + ptr := *doublePtr + if ptr == nil { + builder.AppendNull() + + return nil + } + + return appendToBuilderSinglePtr[IN, OUT, AB](conv)(ptr, builder) + } +} + func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { acceptors := make([]any, 0, len(typeNames)) appenders := make([]func(acceptor any, builder array.Builder) error, 0, len(typeNames)) for i, typeName := range typeNames { + var optional bool + if matches := isOptional.FindStringSubmatch(typeName); len(matches) > 0 { typeName = matches[1] + optional = true } ydbTypeID, err := common.YdbTypeToYdbPrimitiveTypeID(ydbTypes[i]) @@ -163,7 +213,7 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type, cc conver return nil, fmt.Errorf("ydb type to ydb primitive type id: %w", err) } - acceptor, appender, err := makeAcceptorAndAppenderFromSQLType(typeName, ydbTypeID, cc) + acceptor, appender, err := makeAcceptorAppender(typeName, ydbTypeID, optional, cc) if err != nil { return nil, fmt.Errorf("make transformer: %w", err) } @@ -176,47 +226,45 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type, cc conver } //nolint:gocyclo -func makeAcceptorAndAppenderFromSQLType( +func makeAcceptorAppender( typeName string, ydbTypeID Ydb.Type_PrimitiveTypeId, + optional bool, cc conversion.Collection, ) (any, func(acceptor any, builder array.Builder) error, error) { switch typeName { case typeBool: - return new(*bool), appendToBuilderWithValuePtrConverter[bool, uint8, *array.Uint8Builder](cc.Bool()), nil + return makeAcceptorAppenderCheckOptional[bool, uint8, *array.Uint8Builder](optional, cc.Bool()) case typeInt8: - return new(*int8), appendToBuilderWithValuePtrConverter[int8, int8, *array.Int8Builder](cc.Int8()), nil + return makeAcceptorAppenderCheckOptional[int8, int8, *array.Int8Builder](optional, cc.Int8()) case typeInt16: - return new(*int16), appendToBuilderWithValuePtrConverter[int16, int16, *array.Int16Builder](cc.Int16()), nil + return makeAcceptorAppenderCheckOptional[int16, int16, *array.Int16Builder](optional, cc.Int16()) case typeInt32: - return new(*int32), appendToBuilderWithValuePtrConverter[int32, int32, *array.Int32Builder](cc.Int32()), nil + return makeAcceptorAppenderCheckOptional[int32, int32, *array.Int32Builder](optional, cc.Int32()) case typeInt64: - return new(*int64), appendToBuilderWithValuePtrConverter[int64, int64, *array.Int64Builder](cc.Int64()), nil + return makeAcceptorAppenderCheckOptional[int64, int64, *array.Int64Builder](optional, cc.Int64()) case typeUint8: - return new(*uint8), appendToBuilderWithValuePtrConverter[uint8, uint8, *array.Uint8Builder](cc.Uint8()), nil + return makeAcceptorAppenderCheckOptional[uint8, uint8, *array.Uint8Builder](optional, cc.Uint8()) case typeUint16: - return new(*uint16), appendToBuilderWithValuePtrConverter[uint16, uint16, *array.Uint16Builder](cc.Uint16()), nil + return makeAcceptorAppenderCheckOptional[uint16, uint16, *array.Uint16Builder](optional, cc.Uint16()) case typeUint32: - return new(*uint32), appendToBuilderWithValuePtrConverter[uint32, uint32, *array.Uint32Builder](cc.Uint32()), nil + return makeAcceptorAppenderCheckOptional[uint32, uint32, *array.Uint32Builder](optional, cc.Uint32()) case typeUint64: - return new(*uint64), appendToBuilderWithValuePtrConverter[uint64, uint64, *array.Uint64Builder](cc.Uint64()), nil + return makeAcceptorAppenderCheckOptional[uint64, uint64, *array.Uint64Builder](optional, cc.Uint64()) case typeFloat: - return new(*float32), appendToBuilderWithValuePtrConverter[float32, float32, *array.Float32Builder](cc.Float32()), nil + return makeAcceptorAppenderCheckOptional[float32, float32, *array.Float32Builder](optional, cc.Float32()) case typeDouble: - return new(*float64), appendToBuilderWithValuePtrConverter[float64, float64, *array.Float64Builder](cc.Float64()), nil + return makeAcceptorAppenderCheckOptional[float64, float64, *array.Float64Builder](optional, cc.Float64()) case typeString: - return new(*[]byte), appendToBuilderWithValuePtrConverter[[]byte, []byte, *array.BinaryBuilder](cc.Bytes()), nil - case typeUtf8: - return new(*string), appendToBuilderWithValuePtrConverter[string, string, *array.StringBuilder](cc.String()), nil - case typeJSON: - // Copy of UTF8 - return new(*string), appendToBuilderWithValuePtrConverter[string, string, *array.StringBuilder](cc.String()), nil + return makeAcceptorAppenderCheckOptional[[]byte, []byte, *array.BinaryBuilder](optional, cc.Bytes()) + case typeUtf8, typeJSON: + return makeAcceptorAppenderCheckOptional[string, string, *array.StringBuilder](optional, cc.String()) case typeDate: switch ydbTypeID { case Ydb.Type_DATE: - return new(*time.Time), appendToBuilderWithValuePtrConverter[time.Time, uint16, *array.Uint16Builder](cc.Date()), nil + return makeAcceptorAppenderCheckOptional[time.Time, uint16, *array.Uint16Builder](optional, cc.Date()) case Ydb.Type_UTF8: - return new(*time.Time), appendToBuilderWithValuePtrConverter[time.Time, string, *array.StringBuilder](cc.DateToString()), nil + return makeAcceptorAppenderCheckOptional[time.Time, string, *array.StringBuilder](optional, cc.DateToString()) default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) @@ -224,7 +272,7 @@ func makeAcceptorAndAppenderFromSQLType( case typeDatetime: switch ydbTypeID { case Ydb.Type_DATETIME: - return new(*time.Time), appendToBuilderWithValuePtrConverter[time.Time, uint32, *array.Uint32Builder](cc.Datetime()), nil + return makeAcceptorAppenderCheckOptional[time.Time, uint32, *array.Uint32Builder](optional, cc.Datetime()) default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) @@ -232,7 +280,7 @@ func makeAcceptorAndAppenderFromSQLType( case typeTimestamp: switch ydbTypeID { case Ydb.Type_TIMESTAMP: - return new(*time.Time), appendToBuilderWithValuePtrConverter[time.Time, uint64, *array.Uint64Builder](cc.Timestamp()), nil + return makeAcceptorAppenderCheckOptional[time.Time, uint64, *array.Uint64Builder](optional, cc.Timestamp()) default: return nil, nil, fmt.Errorf("unexpected ydb type id %v with sql type %s: %w", ydbTypeID, typeName, common.ErrDataTypeNotSupported) @@ -242,6 +290,18 @@ func makeAcceptorAndAppenderFromSQLType( } } +func makeAcceptorAppenderCheckOptional[ + IN common.ValueType, + OUT common.ValueType, + AB common.ArrowBuilder[OUT], +](optional bool, conv conversion.ValuePtrConverter[IN, OUT]) (any, func(acceptor any, builder array.Builder) error, error) { + if optional { + return new(*IN), appendToBuilderDoublePtr[IN, OUT, AB](conv), nil + } + + return new(IN), appendToBuilderSinglePtr[IN, OUT, AB](conv), nil +} + func NewTypeMapper() datasource.TypeMapper { return typeMapper{} } diff --git a/app/server/embedded_options.go b/app/server/embedded_options.go index f6e7f000..81fdb951 100644 --- a/app/server/embedded_options.go +++ b/app/server/embedded_options.go @@ -90,3 +90,15 @@ func WithConnectionTimeouts(open, ping string) EmbeddedOption { open: open, ping: ping, } } + +type withYdbConnectorMode struct { + mode config.TYdbConfig_Mode +} + +func (o *withYdbConnectorMode) apply(cfg *config.TServerConfig) { + cfg.Datasources.Ydb.Mode = o.mode +} + +func WithYdbConnectorMode(mode config.TYdbConfig_Mode) EmbeddedOption { + return &withYdbConnectorMode{mode: mode} +} diff --git a/app/server/service_connector.go b/app/server/service_connector.go index 68e941a5..1457dec0 100644 --- a/app/server/service_connector.go +++ b/app/server/service_connector.go @@ -66,7 +66,10 @@ func (s *serviceConnector) DescribeTable( return out, nil } -func (s *serviceConnector) ListSplits(request *api_service_protos.TListSplitsRequest, stream api_service.Connector_ListSplitsServer) error { +func (s *serviceConnector) ListSplits( + request *api_service_protos.TListSplitsRequest, + stream api_service.Connector_ListSplitsServer, +) error { logger := mustFromContext(stream.Context()) logger.Info("request handling started", zap.Int("total selects", len(request.Selects))) diff --git a/common/api_helpers.go b/common/api_helpers.go index 1a1dab50..b68dc33a 100644 --- a/common/api_helpers.go +++ b/common/api_helpers.go @@ -72,3 +72,13 @@ func ReadResponsesToArrowRecords(responses []*api_service_protos.TReadSplitsResp return out, nil } + +func ExtractErrorFromReadResponses(responses []*api_service_protos.TReadSplitsResponse) error { + for _, resp := range responses { + if !IsSuccess(resp.Error) { + return NewSTDErrorFromAPIError(resp.Error) + } + } + + return nil +} diff --git a/common/errors.go b/common/errors.go index fd441f94..a5b0f727 100644 --- a/common/errors.go +++ b/common/errors.go @@ -32,7 +32,6 @@ var ( ErrReadLimitExceeded = fmt.Errorf("read limit exceeded") ErrInvalidRequest = fmt.Errorf("invalid request") ErrValueOutOfTypeBounds = fmt.Errorf("value is out of possible range of values for the type") - ErrUnimplemented = fmt.Errorf("unimplemented") ErrUnimplementedTypedValue = fmt.Errorf("unimplemented typed value") ErrUnimplementedExpression = fmt.Errorf("unimplemented expression") ErrUnsupportedExpression = fmt.Errorf("expression is not supported") @@ -289,8 +288,6 @@ func newAPIErrorFromConnectorError(err error) *api_service_protos.TError { status = ydb_proto.StatusIds_UNSUPPORTED case errors.Is(err, ErrUnimplementedPredicateType): status = ydb_proto.StatusIds_UNSUPPORTED - case errors.Is(err, ErrUnimplemented): - status = ydb_proto.StatusIds_UNSUPPORTED case errors.Is(err, ErrUnimplementedArithmeticalExpression): status = ydb_proto.StatusIds_UNSUPPORTED case errors.Is(err, ErrEmptyTableName): diff --git a/go.mod b/go.mod index dd20ad6e..e5f627cd 100644 --- a/go.mod +++ b/go.mod @@ -31,14 +31,14 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 github.com/wI2L/jsondiff v0.4.0 - github.com/ydb-platform/ydb-go-genproto v0.0.0-20240316140903-4a47abca1cca - github.com/ydb-platform/ydb-go-sdk/v3 v3.67.2 + github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136 + github.com/ydb-platform/ydb-go-sdk/v3 v3.84.1 go.uber.org/atomic v1.11.0 go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 golang.org/x/time v0.5.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 - google.golang.org/grpc v1.62.0 + google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v3 v3.0.1 sigs.k8s.io/yaml v1.3.0 diff --git a/go.sum b/go.sum index 39081019..eacb4f8c 100644 --- a/go.sum +++ b/go.sum @@ -264,10 +264,10 @@ github.com/wI2L/jsondiff v0.4.0/go.mod h1:nR/vyy1efuDeAtMwc3AF6nZf/2LD1ID8GTyyJ+ github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20240316140903-4a47abca1cca h1:PliQWLwi2gTSOk7QyYQ9GfjvvikmibLWmaplKHy+kfo= -github.com/ydb-platform/ydb-go-genproto v0.0.0-20240316140903-4a47abca1cca/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= -github.com/ydb-platform/ydb-go-sdk/v3 v3.67.2 h1:jQeLyAgHYVydylc2FGBPuKQBGlXuvOng2Ne6TmKHZ2o= -github.com/ydb-platform/ydb-go-sdk/v3 v3.67.2/go.mod h1:hGX4CijskNnUTRgLlqMvZdrBQc1ALZgAnKHytF5nmj4= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136 h1:MO32/Cba3XpNYWcoz3y13eHZG+RzDHmFPry3ren6BmE= +github.com/ydb-platform/ydb-go-genproto v0.0.0-20240920120314-0fed943b0136/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= +github.com/ydb-platform/ydb-go-sdk/v3 v3.84.1 h1:QkFhXiwoMRZTr7mxk77UeWXTCmkVXXESvncA/ug35S0= +github.com/ydb-platform/ydb-go-sdk/v3 v3.84.1/go.mod h1:BTLL5DJGTAe4sgr3sRum0OQVdNjG1cMjNwZN1qAq7eo= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -405,8 +405,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= -google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk= -google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= +google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/tests/infra/datasource/ydb/suite.go b/tests/infra/datasource/ydb/suite.go index 85a0910b..370319b1 100644 --- a/tests/infra/datasource/ydb/suite.go +++ b/tests/infra/datasource/ydb/suite.go @@ -2,6 +2,7 @@ package ydb import ( "github.com/apache/arrow/go/v13/arrow/array" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_common "github.com/ydb-platform/fq-connector-go/api/common" @@ -267,7 +268,7 @@ func (s *Suite) TestPushdownStringsString() { Payload: tests_utils.MakePredicateComparisonColumn( "col_03_string", api_service_protos.TPredicate_TComparison_EQ, - common.MakeTypedValue(common.MakePrimitiveType(Ydb.Type_STRING), "b"), + common.MakeTypedValue(common.MakePrimitiveType(Ydb.Type_STRING), []byte("b")), ), }), ) diff --git a/tests/suite/scenario.go b/tests/suite/scenario.go index 3147e7fc..847760d2 100644 --- a/tests/suite/scenario.go +++ b/tests/suite/scenario.go @@ -66,7 +66,8 @@ func TestMissingDataSource[ s.Require().NoError(err) // errors count incremented by one - describeTableStatusErr, err := common.DiffStatusSensors(snapshot1, snapshot2, "RATE", "DescribeTable", "status_total", "INTERNAL_ERROR") + describeTableStatusErr, err := common.DiffStatusSensors( + snapshot1, snapshot2, "RATE", "DescribeTable", "status_total", "INTERNAL_ERROR") s.Require().NoError(err) s.Require().Equal(float64(1), describeTableStatusErr) } diff --git a/tests/suite/suite.go b/tests/suite/suite.go index 7d3e0395..e4a09497 100644 --- a/tests/suite/suite.go +++ b/tests/suite/suite.go @@ -66,6 +66,7 @@ func (b *Base[_, _]) SetupSuite() { }, ), server.WithConnectionTimeouts("2s", "1s"), + server.WithYdbConnectorMode(config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES), ) b.Require().NoError(err) b.Connector.Start() @@ -155,7 +156,7 @@ func (b *Base[ID, IDBUILDER]) doValidateTable( b.Require().NotEmpty(table.Name) - ctx, cancel := context.WithTimeout(test_utils.NewContextWithTestName(), 10*time.Second) + ctx, cancel := context.WithTimeout(test_utils.NewContextWithTestName(), 60*time.Second) defer cancel() // describe table @@ -190,6 +191,7 @@ func (b *Base[ID, IDBUILDER]) doValidateTable( splits := common.ListSplitsResponsesToSplits(listSplitsResponses) readSplitsResponses, err := b.Connector.ClientBuffering().ReadSplits(ctx, splits) b.Require().NoError(err) + b.Require().NoError(common.ExtractErrorFromReadResponses(readSplitsResponses)) // either no blocks (empty table), either single block (tables are small) b.Require().Contains([]int{0, 1}, len(readSplitsResponses)) diff --git a/tools/docker_compose_update/internal.go b/tools/docker_compose_update/internal.go index dfc15aff..35f401aa 100644 --- a/tools/docker_compose_update/internal.go +++ b/tools/docker_compose_update/internal.go @@ -39,7 +39,7 @@ func getLatestVersion() (string, error) { link := "https://api.github.com/repos/ydb-platform/fq-connector-go/tags" - req, err := http.NewRequestWithContext(ctx, "GET", link, nil) + req, err := http.NewRequestWithContext(ctx, "GET", link, http.NoBody) if err != nil { return "", fmt.Errorf("http new request: %w", err) } @@ -81,7 +81,7 @@ func getChecksum(tag string) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - req, err := http.NewRequestWithContext(ctx, "GET", link, nil) + req, err := http.NewRequestWithContext(ctx, "GET", link, http.NoBody) if err != nil { return "", fmt.Errorf("http new request: %w", err) }