From c2f4c5943ce89e8ec237cad5a88e76f7a839ac61 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Mon, 14 Oct 2024 23:12:15 +0300 Subject: [PATCH] YDB: support pushdown for optional types in Query Service mode (#204) * YDB: test both connector modes * YDB: test optional pushdown * YDB: validate optional --- .../rdbms/ydb/connection_manager.go | 2 + .../datasource/rdbms/ydb/connection_native.go | 45 ++++++++++------ tests/infra/datasource/ydb/suite.go | 54 +++++++++++++++++-- tests/main_test.go | 18 ++++++- tests/suite/suite.go | 52 +++++++++++++++--- 5 files changed, 144 insertions(+), 27 deletions(-) diff --git a/app/server/datasource/rdbms/ydb/connection_manager.go b/app/server/datasource/rdbms/ydb/connection_manager.go index 88cc5665..d017bfe1 100644 --- a/app/server/datasource/rdbms/ydb/connection_manager.go +++ b/app/server/datasource/rdbms/ydb/connection_manager.go @@ -84,8 +84,10 @@ func (c *connectionManager) Make( case config.TYdbConfig_MODE_UNSPECIFIED: fallthrough case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE: + logger.Debug("YDB Connector will use Native SDK over Query Service") ydbConn = newConnectionNative(ctx, c.QueryLoggerFactory.Make(logger), dsi, ydbDriver) case config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES: + logger.Debug("YDB Connector will use database/sql SDK with scan queries over Table Service") ydbConn, err = newConnectionDatabaseSQL(ctx, logger, c.QueryLoggerFactory.Make(logger), c.cfg, dsi, ydbDriver) default: return nil, fmt.Errorf("unknown mode: %v", c.cfg.Mode) diff --git a/app/server/datasource/rdbms/ydb/connection_native.go b/app/server/datasource/rdbms/ydb/connection_native.go index 48e784fa..493a8b4e 100644 --- a/app/server/datasource/rdbms/ydb/connection_native.go +++ b/app/server/datasource/rdbms/ydb/connection_native.go @@ -228,27 +228,28 @@ func (c *connectionNative) Close() error { return nil } -func newConnectionNative( - ctx context.Context, - queryLogger common.QueryLogger, - dsi *api_common.TDataSourceInstance, - driver *ydb_sdk.Driver, -) ydbConnection { - return &connectionNative{ - ctx: ctx, - driver: driver, - queryLogger: queryLogger, - dsi: dsi, - } -} - func (c *connectionNative) rewriteQuery(params *rdbms_utils.QueryParams) (string, error) { var buf bytes.Buffer buf.WriteString(fmt.Sprintf("PRAGMA TablePathPrefix(\"%s\");", c.dsi.Database)) //nolint:revive for i, arg := range params.QueryArgs.GetAll() { - typeName, err := primitiveYqlTypeName(arg.YdbType.GetTypeId()) + var primitiveTypeID Ydb.Type_PrimitiveTypeId + + if arg.YdbType.GetOptionalType() != nil { + internalType := arg.YdbType.GetOptionalType().GetItem() + + switch t := internalType.GetType().(type) { + case *Ydb.Type_TypeId: + primitiveTypeID = t.TypeId + default: + return "", fmt.Errorf("optional type contains no primitive type: %v", arg.YdbType) + } + } else { + primitiveTypeID = arg.YdbType.GetTypeId() + } + + typeName, err := primitiveYqlTypeName(primitiveTypeID) if err != nil { return "", fmt.Errorf("get YQL type name from value %v: %w", arg, err) } @@ -260,3 +261,17 @@ func (c *connectionNative) rewriteQuery(params *rdbms_utils.QueryParams) (string return buf.String(), nil } + +func newConnectionNative( + ctx context.Context, + queryLogger common.QueryLogger, + dsi *api_common.TDataSourceInstance, + driver *ydb_sdk.Driver, +) ydbConnection { + return &connectionNative{ + ctx: ctx, + driver: driver, + queryLogger: queryLogger, + dsi: dsi, + } +} diff --git a/tests/infra/datasource/ydb/suite.go b/tests/infra/datasource/ydb/suite.go index 370319b1..0eb6ef55 100644 --- a/tests/infra/datasource/ydb/suite.go +++ b/tests/infra/datasource/ydb/suite.go @@ -7,6 +7,7 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/config" "github.com/ydb-platform/fq-connector-go/common" "github.com/ydb-platform/fq-connector-go/tests/infra/datasource" "github.com/ydb-platform/fq-connector-go/tests/suite" @@ -15,7 +16,8 @@ import ( type Suite struct { *suite.Base[int32, *array.Int32Builder] - dataSource *datasource.DataSource + dataSource *datasource.DataSource + connectorMode config.TYdbConfig_Mode } func (s *Suite) TestSelect() { @@ -85,6 +87,10 @@ func (s *Suite) TestPushdownComparisonEQ() { } func (s *Suite) TestPushdownComparisonEQNull() { + if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE { + s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode") + } + s.ValidateTable( s.dataSource, tables["pushdown_comparison_EQ_NULL"], @@ -260,6 +266,20 @@ func (s *Suite) TestPushdownStringsUtf8() { ) } +func (s *Suite) TestPushdownStringsUtf8Optional() { + s.ValidateTable( + s.dataSource, + tables["pushdown_strings_utf8"], + suite.WithPredicate(&api_service_protos.TPredicate{ + Payload: tests_utils.MakePredicateComparisonColumn( + "col_02_utf8", + api_service_protos.TPredicate_TComparison_EQ, + common.MakeTypedValue(common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_UTF8)), "a"), + ), + }), + ) +} + func (s *Suite) TestPushdownStringsString() { s.ValidateTable( s.dataSource, @@ -274,6 +294,20 @@ func (s *Suite) TestPushdownStringsString() { ) } +func (s *Suite) TestPushdownStringsStringOptional() { + s.ValidateTable( + s.dataSource, + tables["pushdown_strings_string"], + suite.WithPredicate(&api_service_protos.TPredicate{ + Payload: tests_utils.MakePredicateComparisonColumn( + "col_03_string", + api_service_protos.TPredicate_TComparison_EQ, + common.MakeTypedValue(common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_STRING)), []byte("b")), + ), + }), + ) +} + func (s *Suite) TestLargeTable() { // For tables larger than 1000 rows, scan queries must be used, // otherwise output will be truncated. @@ -300,6 +334,10 @@ func (s *Suite) TestPositiveStats() { } func (s *Suite) TestMissingDataSource() { + if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE { + s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode") + } + dsi := &api_common.TDataSourceInstance{ Kind: api_common.EDataSourceKind_YDB, Endpoint: &api_common.TEndpoint{Host: "www.google.com", Port: 2136}, @@ -320,12 +358,20 @@ func (s *Suite) TestMissingDataSource() { } func (s *Suite) TestInvalidLogin() { + if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE { + s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode") + } + for _, dsi := range s.dataSource.Instances { suite.TestInvalidLogin(s.Base, dsi, tables["simple"]) } } func (s *Suite) TestInvalidPassword() { + if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE { + s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode") + } + for _, dsi := range s.dataSource.Instances { suite.TestInvalidPassword(s.Base, dsi, tables["simple"]) } @@ -333,13 +379,15 @@ func (s *Suite) TestInvalidPassword() { func NewSuite( baseSuite *suite.Base[int32, *array.Int32Builder], + connectorMode config.TYdbConfig_Mode, ) *Suite { ds, err := deriveDataSourceFromDockerCompose(baseSuite.EndpointDeterminer) baseSuite.Require().NoError(err) result := &Suite{ - Base: baseSuite, - dataSource: ds, + Base: baseSuite, + dataSource: ds, + connectorMode: connectorMode, } return result diff --git a/tests/main_test.go b/tests/main_test.go index 4d0d4480..b0d5053b 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -1,12 +1,15 @@ package tests import ( + "fmt" "log" "testing" "github.com/apache/arrow/go/v13/arrow/array" testify_suite "github.com/stretchr/testify/suite" + "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/app/server" "github.com/ydb-platform/fq-connector-go/tests/infra/datasource/clickhouse" "github.com/ydb-platform/fq-connector-go/tests/infra/datasource/greenplum" "github.com/ydb-platform/fq-connector-go/tests/infra/datasource/ms_sql_server" @@ -40,7 +43,20 @@ func TestPostgreSQL(t *testing.T) { } func TestYDB(t *testing.T) { - testify_suite.Run(t, ydb.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, "YDB"))) + modes := []config.TYdbConfig_Mode{ + config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES, + config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE, + } + + for _, mode := range modes { + suiteName := fmt.Sprintf("YDB_%v", config.TYdbConfig_Mode_name[int32(mode)]) + option := suite.WithEmbeddedOptions(server.WithYdbConnectorMode(mode)) + + testify_suite.Run( + t, + ydb.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, suiteName, option), mode), + ) + } } func TestGreenplum(t *testing.T) { diff --git a/tests/suite/suite.go b/tests/suite/suite.go index e4a09497..308747d4 100644 --- a/tests/suite/suite.go +++ b/tests/suite/suite.go @@ -23,11 +23,16 @@ type Base[ID test_utils.TableIDTypes, IDBUILDER test_utils.ArrowIDBuilder[ID]] s testify_suite.Suite *State Connector common.TestingServer - name string + cfg *baseConfig +} + +type baseConfig struct { + name string // suite name + embeddedOptions []server.EmbeddedOption // additional launching options for Connector service } func (b *Base[_, _]) BeforeTest(_, testName string) { - fmt.Printf("\n>>>>>>>>>> TEST STARTED: %s/%s <<<<<<<<<<\n\n", b.name, testName) + fmt.Printf("\n>>>>>>>>>> TEST STARTED: %s/%s <<<<<<<<<<\n\n", b.cfg.name, testName) } func (b *Base[_, _]) TearDownTest() { @@ -38,14 +43,14 @@ func (b *Base[_, _]) TearDownTest() { } func (b *Base[_, _]) BeforeSuite(_ string) { - fmt.Printf("\n>>>>>>>>>> SUITE STARTED: %s <<<<<<<<<<\n", b.name) + fmt.Printf("\n>>>>>>>>>> SUITE STARTED: %s <<<<<<<<<<\n", b.cfg.name) } func (b *Base[_, _]) SetupSuite() { // We want to run a distinct instance of Connector for every suite var err error - b.Connector, err = server.NewEmbedded( + options := []server.EmbeddedOption{ server.WithLoggerConfig( &config.TLoggerConfig{ LogLevel: config.ELogLevel_DEBUG, @@ -66,7 +71,12 @@ func (b *Base[_, _]) SetupSuite() { }, ), server.WithConnectionTimeouts("2s", "1s"), - server.WithYdbConnectorMode(config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES), + } + + options = append(options, b.cfg.embeddedOptions...) + + b.Connector, err = server.NewEmbedded( + options..., ) b.Require().NoError(err) b.Connector.Start() @@ -75,7 +85,7 @@ func (b *Base[_, _]) SetupSuite() { func (b *Base[_, _]) TearDownSuite() { b.Connector.Stop() - fmt.Printf("\n>>>>>>>>>> Suite stopped: %s <<<<<<<<<<\n", b.name) + fmt.Printf("\n>>>>>>>>>> Suite stopped: %s <<<<<<<<<<\n", b.cfg.name) } type validateTableOptions struct { @@ -202,13 +212,39 @@ func (b *Base[ID, IDBUILDER]) doValidateTable( table.MatchRecords(b.T(), records, schema) } +type BaseOption interface { + apply(cfg *baseConfig) +} + +type embeddedOption struct { + options []server.EmbeddedOption +} + +func (o *embeddedOption) apply(cfg *baseConfig) { + cfg.embeddedOptions = append(cfg.embeddedOptions, o.options...) +} + +func WithEmbeddedOptions(options ...server.EmbeddedOption) BaseOption { + return &embeddedOption{ + options: options, + } +} + func NewBase[ ID test_utils.TableIDTypes, IDBUILDER test_utils.ArrowIDBuilder[ID], -](t *testing.T, state *State, name string) *Base[ID, IDBUILDER] { +](t *testing.T, state *State, name string, suiteOptions ...BaseOption) *Base[ID, IDBUILDER] { + cfg := &baseConfig{ + name: name, + } + + for _, option := range suiteOptions { + option.apply(cfg) + } + b := &Base[ID, IDBUILDER]{ State: state, - name: name, + cfg: cfg, } b.SetT(t)