Skip to content

Commit

Permalink
Logging: reading from multiple databases concurrently (#226)
Browse files Browse the repository at this point in the history
* Change interfaces (not operational)

* Refactored rdbms/utils

* Finally compiled

* Fix YDB integration tests

* File renaming

* Introduce SinkFactory

* More tight integration with Logging API

* Fix service name in generated Logging protocode

* Fix service name in generated Logging protocode

* Fix schema getting

* Managed to read data in parallel

* Fixed data source test

* Fix linter complainings

* Fix unit tests

* Validate port
  • Loading branch information
vitalyisaev2 authored Dec 27, 2024
1 parent 15f817b commit dc16b2c
Show file tree
Hide file tree
Showing 54 changed files with 933 additions and 547 deletions.
83 changes: 44 additions & 39 deletions api/logging/v1/log_group_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/logging/v1/log_group_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 34 additions & 6 deletions app/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,18 +384,30 @@ func validateLoggingConfig(c *config.TLoggingConfig) error {
return fmt.Errorf("validate `ydb`: %w", err)
}

if staticConfig := c.GetStatic(); staticConfig != nil {
if err := validateLoggingResolvingStaticConfig(staticConfig); err != nil {
return fmt.Errorf("validate `static`: %w", err)
}
} else {
return fmt.Errorf("missing `static` section")
if c.GetStatic() == nil && c.GetDynamic() == nil {
return fmt.Errorf("you should set either `static` or `dynamic` section")
}

if c.GetStatic() != nil && c.GetDynamic() != nil {
return fmt.Errorf("you should set either `static` or `dynamic` section, not both of them")
}

if err := validateLoggingResolvingStaticConfig(c.GetStatic()); err != nil {
return fmt.Errorf("validate `static`: %w", err)
}

if err := validateLoggingResolvingDynamicConfig(c.GetDynamic()); err != nil {
return fmt.Errorf("validate `dynamic`: %w", err)
}

return nil
}

func validateLoggingResolvingStaticConfig(c *config.TLoggingConfig_TStaticResolving) error {
if c == nil {
return nil
}

if len(c.Databases) == 0 {
// it's kind of OK to have empty list of databases
return nil
Expand Down Expand Up @@ -429,6 +441,22 @@ func validateLoggingResolvingStaticConfig(c *config.TLoggingConfig_TStaticResolv
return nil
}

func validateLoggingResolvingDynamicConfig(c *config.TLoggingConfig_TDynamicResolving) error {
if c == nil {
return nil
}

if c.LoggingEndpoint.Host == "" {
return fmt.Errorf("missing `logging_endpoint.host`")
}

if c.LoggingEndpoint.Port == 0 {
return fmt.Errorf("missing `logging_endpoint.port`")
}

return nil
}

func validateExponentialBackoff(c *config.TExponentialBackoffConfig) error {
if c == nil {
return fmt.Errorf("required section is missing")
Expand Down
18 changes: 8 additions & 10 deletions app/server/data_source_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func (dsc *DataSourceCollection) DoReadSplit(
}
}

func (dsc *DataSourceCollection) Close() error {
return dsc.rdbms.Close()
}

func readSplit[T paging.Acceptor](
logger *zap.Logger,
stream api_service.Connector_ReadSplitsServer,
Expand All @@ -101,34 +105,28 @@ func readSplit[T paging.Acceptor](
return fmt.Errorf("new columnar buffer factory: %w", err)
}

trafficTracker := paging.NewTrafficTracker[T](cfg.Paging)

sink, err := paging.NewSink(
sinkFactory := paging.NewSinkFactory[T](
stream.Context(),
logger,
trafficTracker,
cfg.Paging,
columnarBufferFactory,
readLimiterFactory.MakeReadLimiter(logger),
int(cfg.Paging.PrefetchQueueCapacity),
)
if err != nil {
return fmt.Errorf("new sink: %w", err)
}

streamer := streaming.NewStreamer(
logger,
stream,
request,
split,
sink,
sinkFactory,
dataSource,
)

if err := streamer.Run(); err != nil {
return fmt.Errorf("run paging streamer: %w", err)
}

readStats := trafficTracker.DumpStats(true)
readStats := sinkFactory.FinalStats()

logger.Debug(
"split reading finished",
Expand Down
5 changes: 3 additions & 2 deletions app/server/datasource/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Factory[T paging.Acceptor] interface {
logger *zap.Logger,
dataSourceType api_common.EGenericDataSourceKind,
) (DataSource[T], error)
Close() error
}

// DataSource is an abstraction over external data storage that is available for data and metadata extraction.
Expand All @@ -37,8 +38,8 @@ type DataSource[T paging.Acceptor] interface {
logger *zap.Logger,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sink paging.Sink[T],
)
sinkFactory paging.SinkFactory[T],
) error
}

type TypeMapper interface {
Expand Down
6 changes: 3 additions & 3 deletions app/server/datasource/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (m *DataSourceMock[T]) ReadSplit(
_ *zap.Logger,
_ *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
pagingWriter paging.Sink[T],
) {
m.Called(split, pagingWriter)
sinkFactory paging.SinkFactory[T],
) error {
return m.Called(split, sinkFactory).Error(0)
}
11 changes: 9 additions & 2 deletions app/server/datasource/rdbms/clickhouse/connection_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

type connectionHTTP struct {
*sql.DB
logger common.QueryLogger
logger common.QueryLogger
databaseName string
tableName string
}

var _ rdbms_utils.Rows = (*rows)(nil)
Expand Down Expand Up @@ -70,11 +72,16 @@ func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Row
return &rows{Rows: out}, nil
}

func (c *connectionHTTP) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}

func makeConnectionHTTP(
ctx context.Context,
logger *zap.Logger,
cfg *config.TClickHouseConfig,
dsi *api_common.TGenericDataSourceInstance,
tableName string,
queryLogger common.QueryLogger,
) (rdbms_utils.Connection, error) {
opts := &clickhouse.Options{
Expand Down Expand Up @@ -122,5 +129,5 @@ func makeConnectionHTTP(
conn.SetMaxOpenConns(maxOpenConns)
conn.SetConnMaxLifetime(connMaxLifetime)

return &connectionHTTP{DB: conn, logger: queryLogger}, nil
return &connectionHTTP{DB: conn, logger: queryLogger, databaseName: dsi.Database, tableName: tableName}, nil
}
35 changes: 24 additions & 11 deletions app/server/datasource/rdbms/clickhouse/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,39 @@ type connectionManager struct {
}

func (c *connectionManager) Make(
params *rdbms_utils.ConnectionParamsMakeParams,
) (rdbms_utils.Connection, error) {
dsi, ctx, logger := params.DataSourceInstance, params.Ctx, params.Logger

if dsi.GetCredentials().GetBasic() == nil {
params *rdbms_utils.ConnectionManagerMakeParams,
) ([]rdbms_utils.Connection, error) {
if params.DataSourceInstance.GetCredentials().GetBasic() == nil {
return nil, fmt.Errorf("currently only basic auth is supported")
}

switch dsi.Protocol {
var (
conn rdbms_utils.Connection
err error
)

switch params.DataSourceInstance.Protocol {
case api_common.EGenericProtocol_NATIVE:
return makeConnectionNative(ctx, logger, c.cfg, dsi, c.QueryLoggerFactory.Make(logger))
conn, err = makeConnectionNative(
params.Ctx, params.Logger, c.cfg, params.DataSourceInstance, params.TableName, c.QueryLoggerFactory.Make(params.Logger))
case api_common.EGenericProtocol_HTTP:
return makeConnectionHTTP(ctx, logger, c.cfg, dsi, c.QueryLoggerFactory.Make(logger))
conn, err = makeConnectionHTTP(
params.Ctx, params.Logger, c.cfg, params.DataSourceInstance, params.TableName, c.QueryLoggerFactory.Make(params.Logger))
default:
return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", dsi.Protocol)
return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", params.DataSourceInstance.Protocol)
}

if err != nil {
return nil, fmt.Errorf("make connection: %w", err)
}

return []rdbms_utils.Connection{conn}, nil
}

func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) {
common.LogCloserError(logger, conn, "close clickhouse connection")
func (*connectionManager) Release(_ context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) {
for _, conn := range cs {
common.LogCloserError(logger, conn, "close clickhouse connection")
}
}

func NewConnectionManager(
Expand Down
11 changes: 9 additions & 2 deletions app/server/datasource/rdbms/clickhouse/connection_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ var _ rdbms_utils.Connection = (*connectionNative)(nil)

type connectionNative struct {
driver.Conn
logger common.QueryLogger
logger common.QueryLogger
databaseName string
tableName string
}

var _ rdbms_utils.Rows = (*rowsNative)(nil)
Expand Down Expand Up @@ -72,11 +74,16 @@ func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.R
return &rowsNative{Rows: out}, nil
}

func (c *connectionNative) From() (databaseName, tableName string) {
return c.databaseName, c.tableName
}

func makeConnectionNative(
ctx context.Context,
logger *zap.Logger,
cfg *config.TClickHouseConfig,
dsi *api_common.TGenericDataSourceInstance,
tableName string,
queryLogger common.QueryLogger,
) (rdbms_utils.Connection, error) {
opts := &clickhouse.Options{
Expand Down Expand Up @@ -117,5 +124,5 @@ func makeConnectionNative(
return nil, fmt.Errorf("conn ping: %w", err)
}

return &connectionNative{Conn: conn, logger: queryLogger}, nil
return &connectionNative{Conn: conn, logger: queryLogger, databaseName: dsi.Database, tableName: tableName}, nil
}
4 changes: 2 additions & 2 deletions app/server/datasource/rdbms/clickhouse/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string {
return sanitizedIdent
}

func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) {
return f.SanitiseIdentifier(params.TableName), nil
func (f sqlFormatter) FormatFrom(_, tableName string) string {
return f.SanitiseIdentifier(tableName)
}

func NewSQLFormatter() rdbms_utils.SQLFormatter {
Expand Down
Loading

0 comments on commit dc16b2c

Please sign in to comment.