From 8c58ea81f7a8c98c0cfd135cd2b4e0f16d42face Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 26 Dec 2024 19:29:00 +0300 Subject: [PATCH] Fix linter complainings --- .../rdbms/clickhouse/connection_http.go | 2 +- .../rdbms/clickhouse/connection_native.go | 2 +- .../rdbms/clickhouse/sql_formatter.go | 2 +- .../rdbms/clickhouse/sql_formatter_test.go | 7 +++---- app/server/datasource/rdbms/datasource.go | 6 +++--- .../datasource/rdbms/datasource_factory.go | 1 + .../datasource/rdbms/logging/resolver.go | 3 ++- .../rdbms/logging/resolver_dynamic.go | 7 ++++--- .../rdbms/logging/resolver_static.go | 2 +- .../rdbms/ms_sql_server/connection.go | 2 +- .../rdbms/ms_sql_server/sql_formatter.go | 2 +- .../datasource/rdbms/mysql/connection.go | 2 +- .../datasource/rdbms/mysql/sql_formatter.go | 2 +- .../datasource/rdbms/oracle/connection.go | 2 +- .../datasource/rdbms/oracle/sql_formatter.go | 2 +- .../rdbms/postgresql/connection_manager.go | 2 +- .../rdbms/postgresql/sql_formatter.go | 2 +- .../rdbms/postgresql/sql_formatter_test.go | 4 +--- app/server/datasource/rdbms/utils/sql_mock.go | 5 +++-- .../rdbms/ydb/connection_database_sql.go | 2 +- .../datasource/rdbms/ydb/connection_native.go | 2 +- .../datasource/rdbms/ydb/sql_formatter.go | 2 +- .../rdbms/ydb/sql_formatter_test.go | 4 +--- app/server/paging/sink_factory.go | 6 ++++-- app/server/streaming/streamer.go | 2 +- app/server/streaming/streamer_test.go | 20 +++++++------------ 26 files changed, 45 insertions(+), 50 deletions(-) diff --git a/app/server/datasource/rdbms/clickhouse/connection_http.go b/app/server/datasource/rdbms/clickhouse/connection_http.go index 82906850..2e199516 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_http.go +++ b/app/server/datasource/rdbms/clickhouse/connection_http.go @@ -72,7 +72,7 @@ func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Row return &rows{Rows: out}, nil } -func (c *connectionHTTP) From() (string, string) { +func (c *connectionHTTP) From() (databaseName, tableName string) { return c.databaseName, c.tableName } diff --git a/app/server/datasource/rdbms/clickhouse/connection_native.go b/app/server/datasource/rdbms/clickhouse/connection_native.go index d725c383..eff25eb4 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_native.go +++ b/app/server/datasource/rdbms/clickhouse/connection_native.go @@ -74,7 +74,7 @@ func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.R return &rowsNative{Rows: out}, nil } -func (c *connectionNative) From() (string, string) { +func (c *connectionNative) From() (databaseName, tableName string) { return c.databaseName, c.tableName } diff --git a/app/server/datasource/rdbms/clickhouse/sql_formatter.go b/app/server/datasource/rdbms/clickhouse/sql_formatter.go index dd2ab73d..e413a3ae 100644 --- a/app/server/datasource/rdbms/clickhouse/sql_formatter.go +++ b/app/server/datasource/rdbms/clickhouse/sql_formatter.go @@ -83,7 +83,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return sanitizedIdent } -func (f sqlFormatter) FormatFrom(databaseName, tableName string) string { +func (f sqlFormatter) FormatFrom(_, tableName string) string { return f.SanitiseIdentifier(tableName) } diff --git a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go index 45a4506b..67df1b7e 100644 --- a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go +++ b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go @@ -23,7 +23,6 @@ func TestMakeSQLFormatterQuery(t *testing.T) { err error } - const tableName = "tab" logger := common.NewTestLogger(t) formatter := NewSQLFormatter() @@ -458,14 +457,14 @@ func TestMakeSQLFormatterQuery(t *testing.T) { tc.selectReq, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL, "", - tableName, + tc.selectReq.From.Table, ) if tc.err != nil { - require.True(t, errors.Is(err, tc.err)) + require.True(t, errors.Is(err, tc.err), err, tc.err) return } - require.NoError(t, err) + require.NoError(t, err, err) require.Equal(t, tc.outputQuery, readSplitsQuery.QueryText) require.Equal(t, tc.outputArgs, readSplitsQuery.QueryArgs.Values()) require.Equal(t, tc.outputSelectWhat, readSplitsQuery.What) diff --git a/app/server/datasource/rdbms/datasource.go b/app/server/datasource/rdbms/datasource.go index be053258..d2eb987b 100644 --- a/app/server/datasource/rdbms/datasource.go +++ b/app/server/datasource/rdbms/datasource.go @@ -127,18 +127,18 @@ func (ds *dataSourceImpl) ReadSplit( // Read data from every connection in a distinct goroutine. // TODO: check if it's OK to override context - errgroup, ctx := errgroup.WithContext(ctx) + group, ctx := errgroup.WithContext(ctx) for i, conn := range cs { conn := conn sink := sinks[i] - errgroup.Go(func() error { + group.Go(func() error { return ds.doReadSplitSingleConn(ctx, logger, request, split, sink, conn) }) } - return errgroup.Wait() + return group.Wait() } func (ds *dataSourceImpl) doReadSplitSingleConn( diff --git a/app/server/datasource/rdbms/datasource_factory.go b/app/server/datasource/rdbms/datasource_factory.go index 22ee18df..65d174ec 100644 --- a/app/server/datasource/rdbms/datasource_factory.go +++ b/app/server/datasource/rdbms/datasource_factory.go @@ -186,6 +186,7 @@ func NewDataSourceFactory( } var err error + dsf.loggingResolver, err = logging.NewResolver(cfg.Logging) if err != nil { return nil, fmt.Errorf("logging resolver: %w", err) diff --git a/app/server/datasource/rdbms/logging/resolver.go b/app/server/datasource/rdbms/logging/resolver.go index 39600a55..9c89ed6b 100644 --- a/app/server/datasource/rdbms/logging/resolver.go +++ b/app/server/datasource/rdbms/logging/resolver.go @@ -4,9 +4,10 @@ import ( "context" "fmt" + "go.uber.org/zap" + api_common "github.com/ydb-platform/fq-connector-go/api/common" "github.com/ydb-platform/fq-connector-go/app/config" - "go.uber.org/zap" ) type resolveParams struct { diff --git a/app/server/datasource/rdbms/logging/resolver_dynamic.go b/app/server/datasource/rdbms/logging/resolver_dynamic.go index f15011b6..184fba9b 100644 --- a/app/server/datasource/rdbms/logging/resolver_dynamic.go +++ b/app/server/datasource/rdbms/logging/resolver_dynamic.go @@ -4,12 +4,13 @@ import ( "crypto/tls" "fmt" - api_logging "github.com/ydb-platform/fq-connector-go/api/logging/v1" - "github.com/ydb-platform/fq-connector-go/app/config" - "github.com/ydb-platform/fq-connector-go/common" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" + + api_logging "github.com/ydb-platform/fq-connector-go/api/logging/v1" + "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/common" ) type dynamicResolver struct { diff --git a/app/server/datasource/rdbms/logging/resolver_static.go b/app/server/datasource/rdbms/logging/resolver_static.go index 77302cda..9e3705db 100644 --- a/app/server/datasource/rdbms/logging/resolver_static.go +++ b/app/server/datasource/rdbms/logging/resolver_static.go @@ -48,7 +48,7 @@ func (r *staticResolver) resolve(request *resolveParams) (*resolveResponse, erro }, nil } -func (r *staticResolver) Close() error { return nil } +func (staticResolver) Close() error { return nil } func newResolverStatic(cfg *config.TLoggingConfig_TStaticResolving) Resolver { return &staticResolver{ diff --git a/app/server/datasource/rdbms/ms_sql_server/connection.go b/app/server/datasource/rdbms/ms_sql_server/connection.go index 5a7a86bf..866304dc 100644 --- a/app/server/datasource/rdbms/ms_sql_server/connection.go +++ b/app/server/datasource/rdbms/ms_sql_server/connection.go @@ -22,7 +22,7 @@ func (c *Connection) Close() error { return c.db.Close() } -func (c *Connection) From() (string, string) { +func (c *Connection) From() (databaseName, tableName string) { return c.databaseName, c.tableName } diff --git a/app/server/datasource/rdbms/ms_sql_server/sql_formatter.go b/app/server/datasource/rdbms/ms_sql_server/sql_formatter.go index ea29d78b..4a528264 100644 --- a/app/server/datasource/rdbms/ms_sql_server/sql_formatter.go +++ b/app/server/datasource/rdbms/ms_sql_server/sql_formatter.go @@ -82,7 +82,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return sanitizedIdent } -func (f sqlFormatter) FormatFrom(databaseName, tableName string) string { +func (f sqlFormatter) FormatFrom(_, tableName string) string { return f.SanitiseIdentifier(tableName) } diff --git a/app/server/datasource/rdbms/mysql/connection.go b/app/server/datasource/rdbms/mysql/connection.go index 8b0d60f6..5fa7808f 100644 --- a/app/server/datasource/rdbms/mysql/connection.go +++ b/app/server/datasource/rdbms/mysql/connection.go @@ -92,6 +92,6 @@ func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, e return r, nil } -func (c *connection) From() (string, string) { +func (c *connection) From() (databaseName, tableName string) { return c.databaseName, c.tableName } diff --git a/app/server/datasource/rdbms/mysql/sql_formatter.go b/app/server/datasource/rdbms/mysql/sql_formatter.go index 94de7cbb..e5be24c7 100644 --- a/app/server/datasource/rdbms/mysql/sql_formatter.go +++ b/app/server/datasource/rdbms/mysql/sql_formatter.go @@ -78,7 +78,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return fmt.Sprintf("`%s`", strings.Replace(ident, "`", "``", -1)) } -func (f sqlFormatter) FormatFrom(databaseName, tableName string) string { +func (f sqlFormatter) FormatFrom(_, tableName string) string { return f.SanitiseIdentifier(tableName) } diff --git a/app/server/datasource/rdbms/oracle/connection.go b/app/server/datasource/rdbms/oracle/connection.go index 78d054b1..e9e43fa1 100644 --- a/app/server/datasource/rdbms/oracle/connection.go +++ b/app/server/datasource/rdbms/oracle/connection.go @@ -47,6 +47,6 @@ func (c *connection) Query(queryParams *rdbms_utils.QueryParams) (rdbms_utils.Ro return rows, nil } -func (c *connection) From() (string, string) { +func (c *connection) From() (databaseName, tableName string) { return c.databaseName, c.tableName } diff --git a/app/server/datasource/rdbms/oracle/sql_formatter.go b/app/server/datasource/rdbms/oracle/sql_formatter.go index 19450b24..0ba72b10 100644 --- a/app/server/datasource/rdbms/oracle/sql_formatter.go +++ b/app/server/datasource/rdbms/oracle/sql_formatter.go @@ -83,7 +83,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return sanitizedIdent } -func (f sqlFormatter) FormatFrom(databaseName, tableName string) string { +func (f sqlFormatter) FormatFrom(_, tableName string) string { return f.SanitiseIdentifier(tableName) } diff --git a/app/server/datasource/rdbms/postgresql/connection_manager.go b/app/server/datasource/rdbms/postgresql/connection_manager.go index 90163334..f682f0a7 100644 --- a/app/server/datasource/rdbms/postgresql/connection_manager.go +++ b/app/server/datasource/rdbms/postgresql/connection_manager.go @@ -66,7 +66,7 @@ func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, e return rows{Rows: out}, nil } -func (c *connection) From() (string, string) { +func (c *connection) From() (databaseName, tableName string) { return c.databaseName, c.tableName } diff --git a/app/server/datasource/rdbms/postgresql/sql_formatter.go b/app/server/datasource/rdbms/postgresql/sql_formatter.go index d001faaa..6991ae8f 100644 --- a/app/server/datasource/rdbms/postgresql/sql_formatter.go +++ b/app/server/datasource/rdbms/postgresql/sql_formatter.go @@ -86,7 +86,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return sanitizedIdent } -func (f sqlFormatter) FormatFrom(databaseName, tableName string) string { +func (f sqlFormatter) FormatFrom(_, tableName string) string { return f.SanitiseIdentifier(tableName) } diff --git a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go index acca4e42..4c76b956 100644 --- a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go +++ b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go @@ -23,8 +23,6 @@ func TestMakeReadSplitsQuery(t *testing.T) { err error } - const tableName = "tab" - logger := common.NewTestLogger(t) formatter := NewSQLFormatter() @@ -460,7 +458,7 @@ func TestMakeReadSplitsQuery(t *testing.T) { tc.selectReq, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL, "", - tableName, + tc.selectReq.From.Table, ) if tc.err != nil { require.True(t, errors.Is(err, tc.err)) diff --git a/app/server/datasource/rdbms/utils/sql_mock.go b/app/server/datasource/rdbms/utils/sql_mock.go index d7be0d44..5378e611 100644 --- a/app/server/datasource/rdbms/utils/sql_mock.go +++ b/app/server/datasource/rdbms/utils/sql_mock.go @@ -30,8 +30,9 @@ func (m *ConnectionMock) Close() error { return m.Called().Error(0) } -func (m *ConnectionMock) From() (string, string) { - return m.Called().String(0), m.Called().String(1) +func (m *ConnectionMock) From() (databaseName, tableName string) { + args := m.Called() + return args.String(0), args.String(1) } type ConnectionManagerMock struct { diff --git a/app/server/datasource/rdbms/ydb/connection_database_sql.go b/app/server/datasource/rdbms/ydb/connection_database_sql.go index 1f14c817..43752beb 100644 --- a/app/server/datasource/rdbms/ydb/connection_database_sql.go +++ b/app/server/datasource/rdbms/ydb/connection_database_sql.go @@ -79,7 +79,7 @@ func (c *connectionDatabaseSQL) getDriver() *ydb_sdk.Driver { return c.driver } -func (c *connectionDatabaseSQL) From() (string, string) { +func (c *connectionDatabaseSQL) From() (databaseName, tableName string) { return c.dataabaseName, c.tableName } diff --git a/app/server/datasource/rdbms/ydb/connection_native.go b/app/server/datasource/rdbms/ydb/connection_native.go index c9123bdc..d7909653 100644 --- a/app/server/datasource/rdbms/ydb/connection_native.go +++ b/app/server/datasource/rdbms/ydb/connection_native.go @@ -245,7 +245,7 @@ func (c *connectionNative) getDriver() *ydb_sdk.Driver { return c.driver } -func (c *connectionNative) From() (string, string) { +func (c *connectionNative) From() (datbaseName, tableName string) { return c.dsi.Database, c.tableName } diff --git a/app/server/datasource/rdbms/ydb/sql_formatter.go b/app/server/datasource/rdbms/ydb/sql_formatter.go index e8594228..2e63d050 100644 --- a/app/server/datasource/rdbms/ydb/sql_formatter.go +++ b/app/server/datasource/rdbms/ydb/sql_formatter.go @@ -96,7 +96,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return fmt.Sprintf("`%s`", ident) } -func (f sqlFormatter) FormatFrom(databaseName, tableName string) string { +func (f sqlFormatter) FormatFrom(_, tableName string) string { // Trim leading slash, otherwise TablePathPrefix won't work. // See https://ydb.tech/docs/ru/yql/reference/syntax/pragma#table-path-prefix tableName = strings.TrimPrefix(tableName, "/") diff --git a/app/server/datasource/rdbms/ydb/sql_formatter_test.go b/app/server/datasource/rdbms/ydb/sql_formatter_test.go index 05caf7c2..f3661cf7 100644 --- a/app/server/datasource/rdbms/ydb/sql_formatter_test.go +++ b/app/server/datasource/rdbms/ydb/sql_formatter_test.go @@ -24,8 +24,6 @@ func TestMakeReadSplitsQuery(t *testing.T) { err error } - const tableName = "tab" - logger := common.NewTestLogger(t) formatter := NewSQLFormatter(config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES) @@ -464,7 +462,7 @@ func TestMakeReadSplitsQuery(t *testing.T) { tc.selectReq, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL, "", - tableName, + tc.selectReq.From.Table, ) if tc.err != nil { require.True(t, errors.Is(err, tc.err)) diff --git a/app/server/paging/sink_factory.go b/app/server/paging/sink_factory.go index 82e104f5..7cf5ff11 100644 --- a/app/server/paging/sink_factory.go +++ b/app/server/paging/sink_factory.go @@ -4,9 +4,10 @@ import ( "context" "fmt" + "go.uber.org/zap" + api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" "github.com/ydb-platform/fq-connector-go/app/config" - "go.uber.org/zap" ) type sinkFactoryState int8 @@ -91,9 +92,9 @@ func (f *sinkFactoryImpl[T]) ResultQueue() <-chan *ReadResult[T] { // FinalStats returns the overall statistics collected during the request processing. func (f *sinkFactoryImpl[T]) FinalStats() *api_service_protos.TReadSplitsResponse_TStats { overallStats := &api_service_protos.TReadSplitsResponse_TStats{} + for _, tracker := range f.trafficTrackers { partialStats := tracker.DumpStats(true) - overallStats.Rows += partialStats.Rows overallStats.Bytes += partialStats.Bytes } @@ -120,6 +121,7 @@ func (f *sinkFactoryImpl[T]) sinkTerminationHandler(terminateChan <-chan struct{ f.logger.Info("all sinks terminated") close(f.resultQueue) f.state = sinkFactoryFinished + return } diff --git a/app/server/streaming/streamer.go b/app/server/streaming/streamer.go index 2d1c7228..fe44c100 100644 --- a/app/server/streaming/streamer.go +++ b/app/server/streaming/streamer.go @@ -47,7 +47,7 @@ func (s *Streamer[T]) writeDataToStream() error { return fmt.Errorf("send buffer to stream: %w", err) } case err := <-s.errorChan: - // an error occured during reading + // an error occurred during reading if err != nil { return fmt.Errorf("error: %w", err) } diff --git a/app/server/streaming/streamer_test.go b/app/server/streaming/streamer_test.go index f6e41804..c7db58ae 100644 --- a/app/server/streaming/streamer_test.go +++ b/app/server/streaming/streamer_test.go @@ -155,10 +155,11 @@ func (tc testCaseStreaming) execute(t *testing.T) { stream.On("Context").Return(ctx) connection := &rdbms_utils.ConnectionMock{} + connection.On("From").Return("", "example_1").Twice() connectionManager := &rdbms_utils.ConnectionManagerMock{} - connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once() - connectionManager.On("Release", connection).Return().Once() + connectionManager.On("Make", split.Select.DataSourceInstance).Return([]rdbms_utils.Connection{connection}, nil).Once() + connectionManager.On("Release", []rdbms_utils.Connection{connection}).Return().Once() rows := &rdbms_utils.RowsMock{ PredefinedData: tc.src, @@ -252,20 +253,13 @@ func (tc testCaseStreaming) execute(t *testing.T) { require.NoError(t, err) pagingCfg := &config.TPagingConfig{RowsPerPage: uint64(tc.rowsPerPage)} - trafficTracker := paging.NewTrafficTracker[any](pagingCfg) readLimiterFactory := paging.NewReadLimiterFactory(nil) - sink, err := paging.NewSink( - ctx, - logger, - trafficTracker, - columnarBufferFactory, - readLimiterFactory.MakeReadLimiter(logger), - tc.bufferQueueCapacity, - ) - require.NoError(t, err) + readLimiter := readLimiterFactory.MakeReadLimiter(logger) + + sinkFactory := paging.NewSinkFactory(ctx, logger, pagingCfg, columnarBufferFactory, readLimiter) request := &api_service_protos.TReadSplitsRequest{} - streamer := NewStreamer(logger, stream, request, split, sink, dataSource) + streamer := NewStreamer(logger, stream, request, split, sinkFactory, dataSource) err = streamer.Run()