Skip to content

Commit

Permalink
Fix linter complainings
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 committed Dec 26, 2024
1 parent b8dbf9b commit 8c58ea8
Show file tree
Hide file tree
Showing 26 changed files with 45 additions and 50 deletions.
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/clickhouse/connection_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Row
return &rows{Rows: out}, nil
}

func (c *connectionHTTP) From() (string, string) {
func (c *connectionHTTP) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.R
return &rowsNative{Rows: out}, nil
}

func (c *connectionNative) From() (string, string) {
func (c *connectionNative) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/clickhouse/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string {
return sanitizedIdent
}

func (f sqlFormatter) FormatFrom(databaseName, tableName string) string {
func (f sqlFormatter) FormatFrom(_, tableName string) string {
return f.SanitiseIdentifier(tableName)
}

Expand Down
7 changes: 3 additions & 4 deletions app/server/datasource/rdbms/clickhouse/sql_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestMakeSQLFormatterQuery(t *testing.T) {
err error
}

const tableName = "tab"
logger := common.NewTestLogger(t)
formatter := NewSQLFormatter()

Expand Down Expand Up @@ -458,14 +457,14 @@ func TestMakeSQLFormatterQuery(t *testing.T) {
tc.selectReq,
api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL,
"",
tableName,
tc.selectReq.From.Table,
)
if tc.err != nil {
require.True(t, errors.Is(err, tc.err))
require.True(t, errors.Is(err, tc.err), err, tc.err)
return
}

require.NoError(t, err)
require.NoError(t, err, err)
require.Equal(t, tc.outputQuery, readSplitsQuery.QueryText)
require.Equal(t, tc.outputArgs, readSplitsQuery.QueryArgs.Values())
require.Equal(t, tc.outputSelectWhat, readSplitsQuery.What)
Expand Down
6 changes: 3 additions & 3 deletions app/server/datasource/rdbms/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ func (ds *dataSourceImpl) ReadSplit(

// Read data from every connection in a distinct goroutine.
// TODO: check if it's OK to override context
errgroup, ctx := errgroup.WithContext(ctx)
group, ctx := errgroup.WithContext(ctx)

for i, conn := range cs {
conn := conn
sink := sinks[i]

errgroup.Go(func() error {
group.Go(func() error {
return ds.doReadSplitSingleConn(ctx, logger, request, split, sink, conn)
})
}

return errgroup.Wait()
return group.Wait()
}

func (ds *dataSourceImpl) doReadSplitSingleConn(
Expand Down
1 change: 1 addition & 0 deletions app/server/datasource/rdbms/datasource_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func NewDataSourceFactory(
}

var err error

dsf.loggingResolver, err = logging.NewResolver(cfg.Logging)
if err != nil {
return nil, fmt.Errorf("logging resolver: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion app/server/datasource/rdbms/logging/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"fmt"

"go.uber.org/zap"

api_common "github.com/ydb-platform/fq-connector-go/api/common"
"github.com/ydb-platform/fq-connector-go/app/config"
"go.uber.org/zap"
)

type resolveParams struct {
Expand Down
7 changes: 4 additions & 3 deletions app/server/datasource/rdbms/logging/resolver_dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"crypto/tls"
"fmt"

api_logging "github.com/ydb-platform/fq-connector-go/api/logging/v1"
"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/common"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"

api_logging "github.com/ydb-platform/fq-connector-go/api/logging/v1"
"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/common"
)

type dynamicResolver struct {
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/logging/resolver_static.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *staticResolver) resolve(request *resolveParams) (*resolveResponse, erro
}, nil
}

func (r *staticResolver) Close() error { return nil }
func (staticResolver) Close() error { return nil }

func newResolverStatic(cfg *config.TLoggingConfig_TStaticResolving) Resolver {
return &staticResolver{
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/ms_sql_server/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (c *Connection) Close() error {
return c.db.Close()
}

func (c *Connection) From() (string, string) {
func (c *Connection) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/ms_sql_server/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string {
return sanitizedIdent
}

func (f sqlFormatter) FormatFrom(databaseName, tableName string) string {
func (f sqlFormatter) FormatFrom(_, tableName string) string {
return f.SanitiseIdentifier(tableName)
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/mysql/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, e
return r, nil
}

func (c *connection) From() (string, string) {
func (c *connection) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/mysql/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string {
return fmt.Sprintf("`%s`", strings.Replace(ident, "`", "``", -1))
}

func (f sqlFormatter) FormatFrom(databaseName, tableName string) string {
func (f sqlFormatter) FormatFrom(_, tableName string) string {
return f.SanitiseIdentifier(tableName)
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/oracle/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ func (c *connection) Query(queryParams *rdbms_utils.QueryParams) (rdbms_utils.Ro
return rows, nil
}

func (c *connection) From() (string, string) {
func (c *connection) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/oracle/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string {
return sanitizedIdent
}

func (f sqlFormatter) FormatFrom(databaseName, tableName string) string {
func (f sqlFormatter) FormatFrom(_, tableName string) string {
return f.SanitiseIdentifier(tableName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, e
return rows{Rows: out}, nil
}

func (c *connection) From() (string, string) {
func (c *connection) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/postgresql/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string {
return sanitizedIdent
}

func (f sqlFormatter) FormatFrom(databaseName, tableName string) string {
func (f sqlFormatter) FormatFrom(_, tableName string) string {
return f.SanitiseIdentifier(tableName)
}

Expand Down
4 changes: 1 addition & 3 deletions app/server/datasource/rdbms/postgresql/sql_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ func TestMakeReadSplitsQuery(t *testing.T) {
err error
}

const tableName = "tab"

logger := common.NewTestLogger(t)
formatter := NewSQLFormatter()

Expand Down Expand Up @@ -460,7 +458,7 @@ func TestMakeReadSplitsQuery(t *testing.T) {
tc.selectReq,
api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL,
"",
tableName,
tc.selectReq.From.Table,
)
if tc.err != nil {
require.True(t, errors.Is(err, tc.err))
Expand Down
5 changes: 3 additions & 2 deletions app/server/datasource/rdbms/utils/sql_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ func (m *ConnectionMock) Close() error {
return m.Called().Error(0)
}

func (m *ConnectionMock) From() (string, string) {
return m.Called().String(0), m.Called().String(1)
func (m *ConnectionMock) From() (databaseName, tableName string) {
args := m.Called()
return args.String(0), args.String(1)
}

type ConnectionManagerMock struct {
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/ydb/connection_database_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *connectionDatabaseSQL) getDriver() *ydb_sdk.Driver {
return c.driver
}

func (c *connectionDatabaseSQL) From() (string, string) {
func (c *connectionDatabaseSQL) From() (databaseName, tableName string) {
return c.dataabaseName, c.tableName
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/ydb/connection_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (c *connectionNative) getDriver() *ydb_sdk.Driver {
return c.driver
}

func (c *connectionNative) From() (string, string) {
func (c *connectionNative) From() (datbaseName, tableName string) {
return c.dsi.Database, c.tableName
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/ydb/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string {
return fmt.Sprintf("`%s`", ident)
}

func (f sqlFormatter) FormatFrom(databaseName, tableName string) string {
func (f sqlFormatter) FormatFrom(_, tableName string) string {
// Trim leading slash, otherwise TablePathPrefix won't work.
// See https://ydb.tech/docs/ru/yql/reference/syntax/pragma#table-path-prefix
tableName = strings.TrimPrefix(tableName, "/")
Expand Down
4 changes: 1 addition & 3 deletions app/server/datasource/rdbms/ydb/sql_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func TestMakeReadSplitsQuery(t *testing.T) {
err error
}

const tableName = "tab"

logger := common.NewTestLogger(t)
formatter := NewSQLFormatter(config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES)

Expand Down Expand Up @@ -464,7 +462,7 @@ func TestMakeReadSplitsQuery(t *testing.T) {
tc.selectReq,
api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL,
"",
tableName,
tc.selectReq.From.Table,
)
if tc.err != nil {
require.True(t, errors.Is(err, tc.err))
Expand Down
6 changes: 4 additions & 2 deletions app/server/paging/sink_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"fmt"

"go.uber.org/zap"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/config"
"go.uber.org/zap"
)

type sinkFactoryState int8
Expand Down Expand Up @@ -91,9 +92,9 @@ func (f *sinkFactoryImpl[T]) ResultQueue() <-chan *ReadResult[T] {
// FinalStats returns the overall statistics collected during the request processing.
func (f *sinkFactoryImpl[T]) FinalStats() *api_service_protos.TReadSplitsResponse_TStats {
overallStats := &api_service_protos.TReadSplitsResponse_TStats{}

for _, tracker := range f.trafficTrackers {
partialStats := tracker.DumpStats(true)

overallStats.Rows += partialStats.Rows
overallStats.Bytes += partialStats.Bytes
}
Expand All @@ -120,6 +121,7 @@ func (f *sinkFactoryImpl[T]) sinkTerminationHandler(terminateChan <-chan struct{
f.logger.Info("all sinks terminated")
close(f.resultQueue)
f.state = sinkFactoryFinished

return
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/streaming/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *Streamer[T]) writeDataToStream() error {
return fmt.Errorf("send buffer to stream: %w", err)
}
case err := <-s.errorChan:
// an error occured during reading
// an error occurred during reading
if err != nil {
return fmt.Errorf("error: %w", err)
}
Expand Down
20 changes: 7 additions & 13 deletions app/server/streaming/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,11 @@ func (tc testCaseStreaming) execute(t *testing.T) {
stream.On("Context").Return(ctx)

connection := &rdbms_utils.ConnectionMock{}
connection.On("From").Return("", "example_1").Twice()

connectionManager := &rdbms_utils.ConnectionManagerMock{}
connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once()
connectionManager.On("Release", connection).Return().Once()
connectionManager.On("Make", split.Select.DataSourceInstance).Return([]rdbms_utils.Connection{connection}, nil).Once()
connectionManager.On("Release", []rdbms_utils.Connection{connection}).Return().Once()

rows := &rdbms_utils.RowsMock{
PredefinedData: tc.src,
Expand Down Expand Up @@ -252,20 +253,13 @@ func (tc testCaseStreaming) execute(t *testing.T) {
require.NoError(t, err)

pagingCfg := &config.TPagingConfig{RowsPerPage: uint64(tc.rowsPerPage)}
trafficTracker := paging.NewTrafficTracker[any](pagingCfg)
readLimiterFactory := paging.NewReadLimiterFactory(nil)
sink, err := paging.NewSink(
ctx,
logger,
trafficTracker,
columnarBufferFactory,
readLimiterFactory.MakeReadLimiter(logger),
tc.bufferQueueCapacity,
)
require.NoError(t, err)
readLimiter := readLimiterFactory.MakeReadLimiter(logger)

sinkFactory := paging.NewSinkFactory(ctx, logger, pagingCfg, columnarBufferFactory, readLimiter)

request := &api_service_protos.TReadSplitsRequest{}
streamer := NewStreamer(logger, stream, request, split, sink, dataSource)
streamer := NewStreamer(logger, stream, request, split, sinkFactory, dataSource)

err = streamer.Run()

Expand Down

0 comments on commit 8c58ea8

Please sign in to comment.