From dc16b2caa814c200239cc9ddf7279d5e27c2e954 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Fri, 27 Dec 2024 11:07:13 +0300 Subject: [PATCH] Logging: reading from multiple databases concurrently (#226) * Change interfaces (not operational) * Refactored rdbms/utils * Finally compiled * Fix YDB integration tests * File renaming * Introduce SinkFactory * More tight integration with Logging API * Fix service name in generated Logging protocode * Fix service name in generated Logging protocode * Fix schema getting * Managed to read data in parallel * Fixed data source test * Fix linter complainings * Fix unit tests * Validate port --- api/logging/v1/log_group_service.pb.go | 83 +++++----- api/logging/v1/log_group_service_grpc.pb.go | 4 +- app/server/config/config.go | 40 ++++- app/server/data_source_collection.go | 18 +-- app/server/datasource/interface.go | 5 +- app/server/datasource/mock.go | 6 +- .../rdbms/clickhouse/connection_http.go | 11 +- .../rdbms/clickhouse/connection_manager.go | 35 ++-- .../rdbms/clickhouse/connection_native.go | 11 +- .../rdbms/clickhouse/sql_formatter.go | 4 +- .../rdbms/clickhouse/sql_formatter_test.go | 9 +- .../rdbms/{data_source.go => datasource.go} | 97 +++++++---- ...ource_factory.go => datasource_factory.go} | 21 ++- ...data_source_test.go => datasource_test.go} | 34 ++-- .../rdbms/logging/connection_manager.go | 93 +++++++++-- .../datasource/rdbms/logging/prefix_getter.go | 45 ------ .../datasource/rdbms/logging/resolver.go | 66 ++------ .../rdbms/logging/resolver_dynamic.go | 81 ++++++++++ .../rdbms/logging/resolver_static.go | 57 +++++++ .../datasource/rdbms/logging/sql_formatter.go | 43 ----- .../rdbms/ms_sql_server/connection.go | 14 +- .../rdbms/ms_sql_server/connection_manager.go | 12 +- .../rdbms/ms_sql_server/sql_formatter.go | 4 +- .../datasource/rdbms/mysql/connection.go | 22 ++- .../rdbms/mysql/connection_manager.go | 12 +- .../datasource/rdbms/mysql/sql_formatter.go | 4 +- .../datasource/rdbms/oracle/connection.go | 18 ++- .../rdbms/oracle/connection_manager.go | 12 +- .../datasource/rdbms/oracle/sql_formatter.go | 4 +- .../rdbms/postgresql/connection_manager.go | 34 ++-- .../rdbms/postgresql/sql_formatter.go | 4 +- .../rdbms/postgresql/sql_formatter_test.go | 5 +- .../datasource/rdbms/utils/query_builder.go | 7 +- .../datasource/rdbms/utils/select_helpers.go | 20 +-- app/server/datasource/rdbms/utils/sql.go | 31 ++-- app/server/datasource/rdbms/utils/sql_mock.go | 15 +- .../rdbms/ydb/connection_database_sql.go | 13 +- .../rdbms/ydb/connection_manager.go | 24 +-- .../datasource/rdbms/ydb/connection_native.go | 7 + .../datasource/rdbms/ydb/prefix_getter.go | 34 ---- .../datasource/rdbms/ydb/schema_provider.go | 22 ++- .../datasource/rdbms/ydb/sql_formatter.go | 6 +- .../rdbms/ydb/sql_formatter_test.go | 2 + app/server/datasource/s3/data_source.go | 19 ++- app/server/paging/interface.go | 17 +- app/server/paging/mock.go | 20 +++ app/server/paging/sink.go | 67 ++------ app/server/paging/sink_factory.go | 152 ++++++++++++++++++ app/server/paging/sink_string.go | 10 +- app/server/service_connector.go | 1 + app/server/streaming/streamer.go | 57 ++++--- app/server/streaming/streamer_test.go | 20 +-- common/connection.go | 4 +- common/endpoint.go | 24 +++ 54 files changed, 933 insertions(+), 547 deletions(-) rename app/server/datasource/rdbms/{data_source.go => datasource.go} (74%) rename app/server/datasource/rdbms/{data_source_factory.go => datasource_factory.go} (94%) rename app/server/datasource/rdbms/{data_source_test.go => datasource_test.go} (82%) delete mode 100644 app/server/datasource/rdbms/logging/prefix_getter.go create mode 100644 app/server/datasource/rdbms/logging/resolver_dynamic.go create mode 100644 app/server/datasource/rdbms/logging/resolver_static.go delete mode 100644 app/server/datasource/rdbms/logging/sql_formatter.go delete mode 100644 app/server/datasource/rdbms/ydb/prefix_getter.go create mode 100644 app/server/paging/sink_factory.go diff --git a/api/logging/v1/log_group_service.pb.go b/api/logging/v1/log_group_service.pb.go index 7ee4ab1b..e75f5d23 100644 --- a/api/logging/v1/log_group_service.pb.go +++ b/api/logging/v1/log_group_service.pb.go @@ -197,39 +197,44 @@ var File_log_group_service_proto protoreflect.FileDescriptor var file_log_group_service_proto_rawDesc = []byte{ 0x0a, 0x17, 0x6c, 0x6f, 0x67, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x6c, 0x6f, 0x67, 0x67, 0x69, - 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x22, 0x57, 0x0a, 0x19, 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x64, - 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x6f, 0x6c, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x6f, 0x6c, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, - 0x1d, 0x0a, 0x0a, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x6f, - 0x0a, 0x1a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x64, 0x70, - 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0c, - 0x6c, 0x6f, 0x67, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0a, 0x6c, 0x6f, 0x67, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, 0x12, 0x2f, - 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, - 0x2e, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x72, 0x6f, 0x75, - 0x70, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x22, - 0x66, 0x0a, 0x0b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x1d, - 0x0a, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, - 0x07, 0x64, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, - 0x64, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x62, 0x5f, 0x65, 0x6e, 0x64, - 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x64, 0x62, 0x45, - 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x32, 0x76, 0x0a, 0x0f, 0x4c, 0x6f, 0x67, 0x47, 0x72, - 0x6f, 0x75, 0x70, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x63, 0x0a, 0x12, 0x47, 0x65, - 0x74, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, - 0x12, 0x25, 0x2e, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, - 0x74, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, - 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x67, 0x45, - 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, - 0x3f, 0x42, 0x05, 0x50, 0x43, 0x4c, 0x47, 0x53, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, - 0x6d, 0x2f, 0x66, 0x71, 0x2d, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2d, 0x67, - 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1c, 0x79, 0x61, 0x6e, 0x64, 0x65, + 0x78, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x70, 0x72, 0x69, 0x76, 0x2e, 0x6c, 0x6f, 0x67, + 0x67, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x22, 0x57, 0x0a, 0x19, 0x47, 0x65, 0x74, 0x52, 0x65, + 0x61, 0x64, 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x6f, 0x6c, 0x64, 0x65, 0x72, 0x5f, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x66, 0x6f, 0x6c, 0x64, 0x65, 0x72, 0x49, + 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4e, 0x61, 0x6d, 0x65, + 0x22, 0x81, 0x01, 0x0a, 0x1a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x67, 0x45, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x20, 0x0a, 0x0c, 0x6c, 0x6f, 0x67, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6c, 0x6f, 0x67, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x49, + 0x64, 0x12, 0x41, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x29, 0x2e, 0x79, 0x61, 0x6e, 0x64, 0x65, 0x78, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, + 0x2e, 0x70, 0x72, 0x69, 0x76, 0x2e, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, + 0x2e, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x52, 0x06, 0x74, 0x61, + 0x62, 0x6c, 0x65, 0x73, 0x22, 0x66, 0x0a, 0x0b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x54, 0x61, 0x62, + 0x6c, 0x65, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x64, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x64, + 0x62, 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x64, 0x62, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x32, 0x9b, 0x01, 0x0a, + 0x0f, 0x4c, 0x6f, 0x67, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x87, 0x01, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x67, 0x45, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x37, 0x2e, 0x79, 0x61, 0x6e, 0x64, 0x65, 0x78, + 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x70, 0x72, 0x69, 0x76, 0x2e, 0x6c, 0x6f, 0x67, 0x67, + 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, + 0x67, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x38, 0x2e, 0x79, 0x61, 0x6e, 0x64, 0x65, 0x78, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, + 0x70, 0x72, 0x69, 0x76, 0x2e, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, + 0x47, 0x65, 0x74, 0x52, 0x65, 0x61, 0x64, 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, + 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3f, 0x42, 0x05, 0x50, 0x43, + 0x4c, 0x47, 0x53, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x79, 0x64, 0x62, 0x2d, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x66, 0x71, 0x2d, + 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2d, 0x67, 0x6f, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -246,14 +251,14 @@ func file_log_group_service_proto_rawDescGZIP() []byte { var file_log_group_service_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_log_group_service_proto_goTypes = []interface{}{ - (*GetReadingEndpointRequest)(nil), // 0: logging.v1.GetReadingEndpointRequest - (*GetReadingEndpointResponse)(nil), // 1: logging.v1.GetReadingEndpointResponse - (*GroupTables)(nil), // 2: logging.v1.GroupTables + (*GetReadingEndpointRequest)(nil), // 0: yandex.cloud.priv.logging.v1.GetReadingEndpointRequest + (*GetReadingEndpointResponse)(nil), // 1: yandex.cloud.priv.logging.v1.GetReadingEndpointResponse + (*GroupTables)(nil), // 2: yandex.cloud.priv.logging.v1.GroupTables } var file_log_group_service_proto_depIdxs = []int32{ - 2, // 0: logging.v1.GetReadingEndpointResponse.tables:type_name -> logging.v1.GroupTables - 0, // 1: logging.v1.LogGroupService.GetReadingEndpoint:input_type -> logging.v1.GetReadingEndpointRequest - 1, // 2: logging.v1.LogGroupService.GetReadingEndpoint:output_type -> logging.v1.GetReadingEndpointResponse + 2, // 0: yandex.cloud.priv.logging.v1.GetReadingEndpointResponse.tables:type_name -> yandex.cloud.priv.logging.v1.GroupTables + 0, // 1: yandex.cloud.priv.logging.v1.LogGroupService.GetReadingEndpoint:input_type -> yandex.cloud.priv.logging.v1.GetReadingEndpointRequest + 1, // 2: yandex.cloud.priv.logging.v1.LogGroupService.GetReadingEndpoint:output_type -> yandex.cloud.priv.logging.v1.GetReadingEndpointResponse 2, // [2:3] is the sub-list for method output_type 1, // [1:2] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name diff --git a/api/logging/v1/log_group_service_grpc.pb.go b/api/logging/v1/log_group_service_grpc.pb.go index 28624410..4b4d2ec6 100644 --- a/api/logging/v1/log_group_service_grpc.pb.go +++ b/api/logging/v1/log_group_service_grpc.pb.go @@ -19,7 +19,7 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - LogGroupService_GetReadingEndpoint_FullMethodName = "/logging.v1.LogGroupService/GetReadingEndpoint" + LogGroupService_GetReadingEndpoint_FullMethodName = "/yandex.cloud.priv.logging.v1.LogGroupService/GetReadingEndpoint" ) // LogGroupServiceClient is the client API for LogGroupService service. @@ -96,7 +96,7 @@ func _LogGroupService_GetReadingEndpoint_Handler(srv interface{}, ctx context.Co // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var LogGroupService_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "logging.v1.LogGroupService", + ServiceName: "yandex.cloud.priv.logging.v1.LogGroupService", HandlerType: (*LogGroupServiceServer)(nil), Methods: []grpc.MethodDesc{ { diff --git a/app/server/config/config.go b/app/server/config/config.go index 4104e686..9350bb03 100644 --- a/app/server/config/config.go +++ b/app/server/config/config.go @@ -384,18 +384,30 @@ func validateLoggingConfig(c *config.TLoggingConfig) error { return fmt.Errorf("validate `ydb`: %w", err) } - if staticConfig := c.GetStatic(); staticConfig != nil { - if err := validateLoggingResolvingStaticConfig(staticConfig); err != nil { - return fmt.Errorf("validate `static`: %w", err) - } - } else { - return fmt.Errorf("missing `static` section") + if c.GetStatic() == nil && c.GetDynamic() == nil { + return fmt.Errorf("you should set either `static` or `dynamic` section") + } + + if c.GetStatic() != nil && c.GetDynamic() != nil { + return fmt.Errorf("you should set either `static` or `dynamic` section, not both of them") + } + + if err := validateLoggingResolvingStaticConfig(c.GetStatic()); err != nil { + return fmt.Errorf("validate `static`: %w", err) + } + + if err := validateLoggingResolvingDynamicConfig(c.GetDynamic()); err != nil { + return fmt.Errorf("validate `dynamic`: %w", err) } return nil } func validateLoggingResolvingStaticConfig(c *config.TLoggingConfig_TStaticResolving) error { + if c == nil { + return nil + } + if len(c.Databases) == 0 { // it's kind of OK to have empty list of databases return nil @@ -429,6 +441,22 @@ func validateLoggingResolvingStaticConfig(c *config.TLoggingConfig_TStaticResolv return nil } +func validateLoggingResolvingDynamicConfig(c *config.TLoggingConfig_TDynamicResolving) error { + if c == nil { + return nil + } + + if c.LoggingEndpoint.Host == "" { + return fmt.Errorf("missing `logging_endpoint.host`") + } + + if c.LoggingEndpoint.Port == 0 { + return fmt.Errorf("missing `logging_endpoint.port`") + } + + return nil +} + func validateExponentialBackoff(c *config.TExponentialBackoffConfig) error { if c == nil { return fmt.Errorf("required section is missing") diff --git a/app/server/data_source_collection.go b/app/server/data_source_collection.go index 269aba0c..4e6025d7 100644 --- a/app/server/data_source_collection.go +++ b/app/server/data_source_collection.go @@ -80,6 +80,10 @@ func (dsc *DataSourceCollection) DoReadSplit( } } +func (dsc *DataSourceCollection) Close() error { + return dsc.rdbms.Close() +} + func readSplit[T paging.Acceptor]( logger *zap.Logger, stream api_service.Connector_ReadSplitsServer, @@ -101,26 +105,20 @@ func readSplit[T paging.Acceptor]( return fmt.Errorf("new columnar buffer factory: %w", err) } - trafficTracker := paging.NewTrafficTracker[T](cfg.Paging) - - sink, err := paging.NewSink( + sinkFactory := paging.NewSinkFactory[T]( stream.Context(), logger, - trafficTracker, + cfg.Paging, columnarBufferFactory, readLimiterFactory.MakeReadLimiter(logger), - int(cfg.Paging.PrefetchQueueCapacity), ) - if err != nil { - return fmt.Errorf("new sink: %w", err) - } streamer := streaming.NewStreamer( logger, stream, request, split, - sink, + sinkFactory, dataSource, ) @@ -128,7 +126,7 @@ func readSplit[T paging.Acceptor]( return fmt.Errorf("run paging streamer: %w", err) } - readStats := trafficTracker.DumpStats(true) + readStats := sinkFactory.FinalStats() logger.Debug( "split reading finished", diff --git a/app/server/datasource/interface.go b/app/server/datasource/interface.go index 62cc5681..283d191e 100644 --- a/app/server/datasource/interface.go +++ b/app/server/datasource/interface.go @@ -17,6 +17,7 @@ type Factory[T paging.Acceptor] interface { logger *zap.Logger, dataSourceType api_common.EGenericDataSourceKind, ) (DataSource[T], error) + Close() error } // DataSource is an abstraction over external data storage that is available for data and metadata extraction. @@ -37,8 +38,8 @@ type DataSource[T paging.Acceptor] interface { logger *zap.Logger, request *api_service_protos.TReadSplitsRequest, split *api_service_protos.TSplit, - sink paging.Sink[T], - ) + sinkFactory paging.SinkFactory[T], + ) error } type TypeMapper interface { diff --git a/app/server/datasource/mock.go b/app/server/datasource/mock.go index d470d02a..96af8d70 100644 --- a/app/server/datasource/mock.go +++ b/app/server/datasource/mock.go @@ -30,7 +30,7 @@ func (m *DataSourceMock[T]) ReadSplit( _ *zap.Logger, _ *api_service_protos.TReadSplitsRequest, split *api_service_protos.TSplit, - pagingWriter paging.Sink[T], -) { - m.Called(split, pagingWriter) + sinkFactory paging.SinkFactory[T], +) error { + return m.Called(split, sinkFactory).Error(0) } diff --git a/app/server/datasource/rdbms/clickhouse/connection_http.go b/app/server/datasource/rdbms/clickhouse/connection_http.go index 7f2ac468..2e199516 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_http.go +++ b/app/server/datasource/rdbms/clickhouse/connection_http.go @@ -21,7 +21,9 @@ import ( type connectionHTTP struct { *sql.DB - logger common.QueryLogger + logger common.QueryLogger + databaseName string + tableName string } var _ rdbms_utils.Rows = (*rows)(nil) @@ -70,11 +72,16 @@ func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Row return &rows{Rows: out}, nil } +func (c *connectionHTTP) From() (databaseName, tableName string) { + return c.databaseName, c.tableName +} + func makeConnectionHTTP( ctx context.Context, logger *zap.Logger, cfg *config.TClickHouseConfig, dsi *api_common.TGenericDataSourceInstance, + tableName string, queryLogger common.QueryLogger, ) (rdbms_utils.Connection, error) { opts := &clickhouse.Options{ @@ -122,5 +129,5 @@ func makeConnectionHTTP( conn.SetMaxOpenConns(maxOpenConns) conn.SetConnMaxLifetime(connMaxLifetime) - return &connectionHTTP{DB: conn, logger: queryLogger}, nil + return &connectionHTTP{DB: conn, logger: queryLogger, databaseName: dsi.Database, tableName: tableName}, nil } diff --git a/app/server/datasource/rdbms/clickhouse/connection_manager.go b/app/server/datasource/rdbms/clickhouse/connection_manager.go index dc789ccb..f9f57913 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_manager.go +++ b/app/server/datasource/rdbms/clickhouse/connection_manager.go @@ -20,26 +20,39 @@ type connectionManager struct { } func (c *connectionManager) Make( - params *rdbms_utils.ConnectionParamsMakeParams, -) (rdbms_utils.Connection, error) { - dsi, ctx, logger := params.DataSourceInstance, params.Ctx, params.Logger - - if dsi.GetCredentials().GetBasic() == nil { + params *rdbms_utils.ConnectionManagerMakeParams, +) ([]rdbms_utils.Connection, error) { + if params.DataSourceInstance.GetCredentials().GetBasic() == nil { return nil, fmt.Errorf("currently only basic auth is supported") } - switch dsi.Protocol { + var ( + conn rdbms_utils.Connection + err error + ) + + switch params.DataSourceInstance.Protocol { case api_common.EGenericProtocol_NATIVE: - return makeConnectionNative(ctx, logger, c.cfg, dsi, c.QueryLoggerFactory.Make(logger)) + conn, err = makeConnectionNative( + params.Ctx, params.Logger, c.cfg, params.DataSourceInstance, params.TableName, c.QueryLoggerFactory.Make(params.Logger)) case api_common.EGenericProtocol_HTTP: - return makeConnectionHTTP(ctx, logger, c.cfg, dsi, c.QueryLoggerFactory.Make(logger)) + conn, err = makeConnectionHTTP( + params.Ctx, params.Logger, c.cfg, params.DataSourceInstance, params.TableName, c.QueryLoggerFactory.Make(params.Logger)) default: - return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", dsi.Protocol) + return nil, fmt.Errorf("can not run ClickHouse connection with protocol '%v'", params.DataSourceInstance.Protocol) + } + + if err != nil { + return nil, fmt.Errorf("make connection: %w", err) } + + return []rdbms_utils.Connection{conn}, nil } -func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) { - common.LogCloserError(logger, conn, "close clickhouse connection") +func (*connectionManager) Release(_ context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) { + for _, conn := range cs { + common.LogCloserError(logger, conn, "close clickhouse connection") + } } func NewConnectionManager( diff --git a/app/server/datasource/rdbms/clickhouse/connection_native.go b/app/server/datasource/rdbms/clickhouse/connection_native.go index 0dddc3b5..eff25eb4 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_native.go +++ b/app/server/datasource/rdbms/clickhouse/connection_native.go @@ -22,7 +22,9 @@ var _ rdbms_utils.Connection = (*connectionNative)(nil) type connectionNative struct { driver.Conn - logger common.QueryLogger + logger common.QueryLogger + databaseName string + tableName string } var _ rdbms_utils.Rows = (*rowsNative)(nil) @@ -72,11 +74,16 @@ func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.R return &rowsNative{Rows: out}, nil } +func (c *connectionNative) From() (databaseName, tableName string) { + return c.databaseName, c.tableName +} + func makeConnectionNative( ctx context.Context, logger *zap.Logger, cfg *config.TClickHouseConfig, dsi *api_common.TGenericDataSourceInstance, + tableName string, queryLogger common.QueryLogger, ) (rdbms_utils.Connection, error) { opts := &clickhouse.Options{ @@ -117,5 +124,5 @@ func makeConnectionNative( return nil, fmt.Errorf("conn ping: %w", err) } - return &connectionNative{Conn: conn, logger: queryLogger}, nil + return &connectionNative{Conn: conn, logger: queryLogger, databaseName: dsi.Database, tableName: tableName}, nil } diff --git a/app/server/datasource/rdbms/clickhouse/sql_formatter.go b/app/server/datasource/rdbms/clickhouse/sql_formatter.go index 7febaf93..e413a3ae 100644 --- a/app/server/datasource/rdbms/clickhouse/sql_formatter.go +++ b/app/server/datasource/rdbms/clickhouse/sql_formatter.go @@ -83,8 +83,8 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return sanitizedIdent } -func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) { - return f.SanitiseIdentifier(params.TableName), nil +func (f sqlFormatter) FormatFrom(_, tableName string) string { + return f.SanitiseIdentifier(tableName) } func NewSQLFormatter() rdbms_utils.SQLFormatter { diff --git a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go index a07caced..67df1b7e 100644 --- a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go +++ b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go @@ -455,13 +455,16 @@ func TestMakeSQLFormatterQuery(t *testing.T) { context.Background(), logger, formatter, tc.selectReq, - api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL) + api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL, + "", + tc.selectReq.From.Table, + ) if tc.err != nil { - require.True(t, errors.Is(err, tc.err)) + require.True(t, errors.Is(err, tc.err), err, tc.err) return } - require.NoError(t, err) + require.NoError(t, err, err) require.Equal(t, tc.outputQuery, readSplitsQuery.QueryText) require.Equal(t, tc.outputArgs, readSplitsQuery.QueryArgs.Values()) require.Equal(t, tc.outputSelectWhat, readSplitsQuery.What) diff --git a/app/server/datasource/rdbms/data_source.go b/app/server/datasource/rdbms/datasource.go similarity index 74% rename from app/server/datasource/rdbms/data_source.go rename to app/server/datasource/rdbms/datasource.go index 508f88f4..d2eb987b 100644 --- a/app/server/datasource/rdbms/data_source.go +++ b/app/server/datasource/rdbms/datasource.go @@ -5,6 +5,7 @@ import ( "fmt" "go.uber.org/zap" + "golang.org/x/sync/errgroup" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" "github.com/ydb-platform/fq-connector-go/app/server/conversion" @@ -40,20 +41,21 @@ func (ds *dataSourceImpl) DescribeTable( logger *zap.Logger, request *api_service_protos.TDescribeTableRequest, ) (*api_service_protos.TDescribeTableResponse, error) { - var conn rdbms_utils.Connection + var cs []rdbms_utils.Connection err := ds.retrierSet.MakeConnection.Run(ctx, logger, func() error { var makeConnErr error - params := &rdbms_utils.ConnectionParamsMakeParams{ + params := &rdbms_utils.ConnectionManagerMakeParams{ Ctx: ctx, Logger: logger, DataSourceInstance: request.DataSourceInstance, TableName: request.Table, + MaxConnections: 1, // single connection is enough to get metadata } - conn, makeConnErr = ds.connectionManager.Make(params) + cs, makeConnErr = ds.connectionManager.Make(params) if makeConnErr != nil { return fmt.Errorf("make connection: %w", makeConnErr) } @@ -66,7 +68,10 @@ func (ds *dataSourceImpl) DescribeTable( return nil, fmt.Errorf("retry: %w", err) } - defer ds.connectionManager.Release(ctx, logger, conn) + defer ds.connectionManager.Release(ctx, logger, cs) + + // We asked for a single connection + conn := cs[0] schema, err := ds.schemaProvider.GetSchema(ctx, logger, conn, request) if err != nil { @@ -76,34 +81,30 @@ func (ds *dataSourceImpl) DescribeTable( return &api_service_protos.TDescribeTableResponse{Schema: schema}, nil } -func (ds *dataSourceImpl) doReadSplit( +func (ds *dataSourceImpl) ReadSplit( ctx context.Context, logger *zap.Logger, request *api_service_protos.TReadSplitsRequest, split *api_service_protos.TSplit, - sink paging.Sink[any], + sinkFactory paging.SinkFactory[any], ) error { - readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery(ctx, logger, ds.sqlFormatter, split.Select, request.Filtering) - if err != nil { - return fmt.Errorf("make read split query: %w", err) - } + // Make connection(s) to the data source. + var cs []rdbms_utils.Connection - var conn rdbms_utils.Connection - - err = ds.retrierSet.MakeConnection.Run( + err := ds.retrierSet.MakeConnection.Run( ctx, logger, func() error { var makeConnErr error - params := &rdbms_utils.ConnectionParamsMakeParams{ + params := &rdbms_utils.ConnectionManagerMakeParams{ Ctx: ctx, Logger: logger, DataSourceInstance: split.Select.DataSourceInstance, TableName: split.Select.From.Table, } - conn, makeConnErr = ds.connectionManager.Make(params) + cs, makeConnErr = ds.connectionManager.Make(params) if makeConnErr != nil { return fmt.Errorf("make connection: %w", makeConnErr) } @@ -116,7 +117,53 @@ func (ds *dataSourceImpl) doReadSplit( return fmt.Errorf("make connection: %w", err) } - defer ds.connectionManager.Release(ctx, logger, conn) + defer ds.connectionManager.Release(ctx, logger, cs) + + // Prepare sinks that will accept the data from the connections. + sinks, err := sinkFactory.MakeSinks(len(cs)) + if err != nil { + return fmt.Errorf("make sinks: %w", err) + } + + // Read data from every connection in a distinct goroutine. + // TODO: check if it's OK to override context + group, ctx := errgroup.WithContext(ctx) + + for i, conn := range cs { + conn := conn + sink := sinks[i] + + group.Go(func() error { + return ds.doReadSplitSingleConn(ctx, logger, request, split, sink, conn) + }) + } + + return group.Wait() +} + +func (ds *dataSourceImpl) doReadSplitSingleConn( + ctx context.Context, + logger *zap.Logger, + request *api_service_protos.TReadSplitsRequest, + split *api_service_protos.TSplit, + sink paging.Sink[any], + conn rdbms_utils.Connection, +) error { + databaseName, tableName := conn.From() + + readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery( + ctx, + logger, + ds.sqlFormatter, + split.Select, + request.Filtering, + databaseName, + tableName, + ) + + if err != nil { + return fmt.Errorf("make read split query: %w", err) + } var rows rdbms_utils.Rows @@ -166,22 +213,10 @@ func (ds *dataSourceImpl) doReadSplit( return fmt.Errorf("rows error: %w", err) } - return nil -} - -func (ds *dataSourceImpl) ReadSplit( - ctx context.Context, - logger *zap.Logger, - request *api_service_protos.TReadSplitsRequest, - split *api_service_protos.TSplit, - sink paging.Sink[any], -) { - err := ds.doReadSplit(ctx, logger, request, split, sink) - if err != nil { - sink.AddError(err) - } - + // Notify parent that there will be no more data from this connection. sink.Finish() + + return nil } func NewDataSource( diff --git a/app/server/datasource/rdbms/data_source_factory.go b/app/server/datasource/rdbms/datasource_factory.go similarity index 94% rename from app/server/datasource/rdbms/data_source_factory.go rename to app/server/datasource/rdbms/datasource_factory.go index d4ab573f..65d174ec 100644 --- a/app/server/datasource/rdbms/data_source_factory.go +++ b/app/server/datasource/rdbms/datasource_factory.go @@ -34,6 +34,7 @@ type dataSourceFactory struct { oracle Preset logging Preset converterCollection conversion.Collection + loggingResolver logging.Resolver } func (dsf *dataSourceFactory) Make( @@ -62,6 +63,14 @@ func (dsf *dataSourceFactory) Make( } } +func (dsf *dataSourceFactory) Close() error { + if err := dsf.loggingResolver.Close(); err != nil { + return fmt.Errorf("close logging resolver: %w", err) + } + + return nil +} + func NewDataSourceFactory( cfg *config.TDatasourcesConfig, qlf common.QueryLoggerFactory, @@ -120,7 +129,7 @@ func NewDataSourceFactory( SQLFormatter: ydb.NewSQLFormatter(cfg.Ydb.Mode), ConnectionManager: ydb.NewConnectionManager(cfg.Ydb, connManagerBase), TypeMapper: ydbTypeMapper, - SchemaProvider: ydb.NewSchemaProvider(ydbTypeMapper, ydb.NewPrefixGetter()), + SchemaProvider: ydb.NewSchemaProvider(ydbTypeMapper), RetrierSet: &retry.RetrierSet{ MakeConnection: retry.NewRetrierFromConfig(cfg.Ydb.ExponentialBackoff, retry.ErrorCheckerMakeConnectionCommon), Query: retry.NewRetrierFromConfig(cfg.Ydb.ExponentialBackoff, ydb.ErrorCheckerQuery), @@ -176,16 +185,18 @@ func NewDataSourceFactory( converterCollection: converterCollection, } - loggingResolver, err := logging.NewResolver(cfg.Logging) + var err error + + dsf.loggingResolver, err = logging.NewResolver(cfg.Logging) if err != nil { return nil, fmt.Errorf("logging resolver: %w", err) } dsf.logging = Preset{ - SQLFormatter: logging.NewSQLFormatter(loggingResolver, cfg.Ydb.Mode), - ConnectionManager: logging.NewConnectionManager(cfg.Logging, connManagerBase, loggingResolver), + SQLFormatter: ydb.NewSQLFormatter(cfg.Ydb.Mode), + ConnectionManager: logging.NewConnectionManager(cfg.Logging, connManagerBase, dsf.loggingResolver), TypeMapper: ydbTypeMapper, - SchemaProvider: ydb.NewSchemaProvider(ydbTypeMapper, logging.NewPrefixGetter(loggingResolver)), + SchemaProvider: ydb.NewSchemaProvider(ydbTypeMapper), RetrierSet: &retry.RetrierSet{ MakeConnection: retry.NewRetrierFromConfig(cfg.Ydb.ExponentialBackoff, retry.ErrorCheckerMakeConnectionCommon), Query: retry.NewRetrierFromConfig(cfg.Ydb.ExponentialBackoff, ydb.ErrorCheckerQuery), diff --git a/app/server/datasource/rdbms/data_source_test.go b/app/server/datasource/rdbms/datasource_test.go similarity index 82% rename from app/server/datasource/rdbms/data_source_test.go rename to app/server/datasource/rdbms/datasource_test.go index 5e1efd79..8a7be510 100644 --- a/app/server/datasource/rdbms/data_source_test.go +++ b/app/server/datasource/rdbms/datasource_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_common "github.com/ydb-platform/fq-connector-go/api/common" @@ -67,8 +68,10 @@ func TestReadSplit(t *testing.T) { } connection := &rdbms_utils.ConnectionMock{} - connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once() - connectionManager.On("Release", connection).Return().Once() + connection.On("From").Return("", "example_1").Once() + + connectionManager.On("Make", split.Select.DataSourceInstance).Return([]rdbms_utils.Connection{connection}, nil).Once() + connectionManager.On("Release", []rdbms_utils.Connection{connection}).Return().Once() rows := &rdbms_utils.RowsMock{ PredefinedData: [][]any{ @@ -99,10 +102,15 @@ func TestReadSplit(t *testing.T) { sink.On("AddRow", transformer).Return(nil).Times(2) sink.On("Finish").Return().Once() + sinkFactory := &paging.SinkFactoryMock{} + sinkFactory.On("MakeSinks", 1).Return([]paging.Sink[any]{sink}, nil).Once() + dataSource := NewDataSource(logger, preset, converterCollection) - dataSource.ReadSplit(ctx, logger, readSplitsRequest, split, sink) - mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink) + err := dataSource.ReadSplit(ctx, logger, readSplitsRequest, split, sinkFactory) + require.NoError(t, err) + + mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink, sinkFactory) }) t.Run("scan error", func(t *testing.T) { @@ -116,8 +124,10 @@ func TestReadSplit(t *testing.T) { } connection := &rdbms_utils.ConnectionMock{} - connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once() - connectionManager.On("Release", connection).Return().Once() + connection.On("From").Return("", "example_1").Once() + + connectionManager.On("Make", split.Select.DataSourceInstance).Return([]rdbms_utils.Connection{connection}, nil).Once() + connectionManager.On("Release", []rdbms_utils.Connection{connection}).Return().Once() rows := &rdbms_utils.RowsMock{ PredefinedData: [][]any{ @@ -149,15 +159,15 @@ func TestReadSplit(t *testing.T) { sink := &paging.SinkMock{} sink.On("AddRow", transformer).Return(nil).Once() - sink.On("AddError", mock.MatchedBy(func(err error) bool { - return errors.Is(err, scanErr) - })).Return().Once() - sink.On("Finish").Return().Once() + + sinkFactory := &paging.SinkFactoryMock{} + sinkFactory.On("MakeSinks", 1).Return([]paging.Sink[any]{sink}, nil).Once() datasource := NewDataSource(logger, preset, converterCollection) - datasource.ReadSplit(ctx, logger, readSplitsRequest, split, sink) + err := datasource.ReadSplit(ctx, logger, readSplitsRequest, split, sinkFactory) + require.True(t, errors.Is(err, scanErr)) - mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink) + mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink, sinkFactory) }) } diff --git a/app/server/datasource/rdbms/logging/connection_manager.go b/app/server/datasource/rdbms/logging/connection_manager.go index b998cf13..8862ed03 100644 --- a/app/server/datasource/rdbms/logging/connection_manager.go +++ b/app/server/datasource/rdbms/logging/connection_manager.go @@ -3,6 +3,9 @@ package logging import ( "context" "fmt" + "sync" + + "golang.org/x/sync/errgroup" api_common "github.com/ydb-platform/fq-connector-go/api/common" "github.com/ydb-platform/fq-connector-go/app/config" @@ -20,12 +23,16 @@ type connectionManager struct { } func (cm *connectionManager) Make( - params *rdbms_utils.ConnectionParamsMakeParams, -) (rdbms_utils.Connection, error) { - // turn log group name into YDB endpoint + params *rdbms_utils.ConnectionManagerMakeParams, +) ([]rdbms_utils.Connection, error) { + // Turn log group name into physical YDB endpoints + // via static config or Logging API call. request := &resolveParams{ + ctx: params.Ctx, + logger: params.Logger, folderId: params.DataSourceInstance.GetLoggingOptions().GetFolderId(), logGroupName: params.TableName, + iamToken: params.DataSourceInstance.GetCredentials().GetToken().GetValue(), } response, err := cm.resolver.resolve(request) @@ -33,37 +40,91 @@ func (cm *connectionManager) Make( return nil, fmt.Errorf("resolve YDB endpoint: %w", err) } - params.Logger.Debug("Resolved log group into YDB endpoint", response.ToZapFields()...) + // Determine how much connections we need to create + // taking into account optional limit. + totalConnections := len(response.sources) + if params.MaxConnections > 0 && params.MaxConnections < totalConnections { + totalConnections = params.MaxConnections + } + + var ( + group errgroup.Group + cs = make([]rdbms_utils.Connection, 0, totalConnections) + mutex sync.Mutex + ) + + for i, src := range response.sources { + // If connection limit is set, create only requested number of connections. + if i >= totalConnections { + break + } + + src := src + + group.Go(func() error { + conn, err := cm.makeConnectionFromYDBSource(params, src) + if err != nil { + return fmt.Errorf("make connection from YDB source: %w", err) + } + + mutex.Lock() + cs = append(cs, conn) + mutex.Unlock() + + return nil + }) + } + + if err := group.Wait(); err != nil { + for _, conn := range cs { + if conn != nil { + common.LogCloserError(params.Logger, conn, "close connection") + } + } + + return nil, fmt.Errorf("group wait: %w", err) + } + + return cs, nil +} + +func (cm *connectionManager) makeConnectionFromYDBSource( + params *rdbms_utils.ConnectionManagerMakeParams, + src *ydbSource, +) (rdbms_utils.Connection, error) { + params.Logger.Debug("resolved log group into YDB endpoint", src.ToZapFields()...) // prepare new data source instance describing the underlying YDB database ydbDataSourceInstance := &api_common.TGenericDataSourceInstance{ Kind: api_common.EGenericDataSourceKind_YDB, - Endpoint: response.endpoint, - Database: response.databaseName, - Credentials: params.DataSourceInstance.GetCredentials(), + Endpoint: src.endpoint, + Database: src.databaseName, + Credentials: nil, // YDB connector should use static SA credentials provided in config UseTls: true, } // reannotate logger with new data source instance ydbLogger := common.AnnotateLoggerWithDataSourceInstance(params.Logger, ydbDataSourceInstance) - ydbParams := &rdbms_utils.ConnectionParamsMakeParams{ + conn, err := cm.ydbConnectionManager.Make(&rdbms_utils.ConnectionManagerMakeParams{ Ctx: params.Ctx, Logger: ydbLogger, - DataSourceInstance: ydbDataSourceInstance, - } - - // build YDB connection - conn, err := cm.ydbConnectionManager.Make(ydbParams) + DataSourceInstance: ydbDataSourceInstance, // use resolved YDB database + TableName: src.tableName, // use resolved YDB table + }) if err != nil { return nil, fmt.Errorf("make YDB connection: %w", err) } - return conn, nil + if len(conn) != 1 { + return nil, fmt.Errorf("invalid number of YDB connections: %d", len(conn)) + } + + return conn[0], nil } -func (cm *connectionManager) Release(ctx context.Context, logger *zap.Logger, conn rdbms_utils.Connection) { - cm.ydbConnectionManager.Release(ctx, logger, conn) +func (cm *connectionManager) Release(ctx context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) { + cm.ydbConnectionManager.Release(ctx, logger, cs) } func NewConnectionManager( diff --git a/app/server/datasource/rdbms/logging/prefix_getter.go b/app/server/datasource/rdbms/logging/prefix_getter.go deleted file mode 100644 index b00984ea..00000000 --- a/app/server/datasource/rdbms/logging/prefix_getter.go +++ /dev/null @@ -1,45 +0,0 @@ -package logging - -import ( - "context" - "fmt" - "path" - - "go.uber.org/zap" - - api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" - rdbms_ydb "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/ydb" - - "github.com/ydb-platform/ydb-go-sdk/v3" -) - -var _ rdbms_ydb.PrefixGetter = (*prefixGetter)(nil) - -type prefixGetter struct { - resolver Resolver -} - -func (p *prefixGetter) GetPrefix( - ctx context.Context, - logger *zap.Logger, - _ *ydb.Driver, - request *api_service_protos.TDescribeTableRequest, -) (string, error) { - params := &resolveParams{ - ctx: ctx, - logger: logger, - folderId: request.DataSourceInstance.GetLoggingOptions().GetFolderId(), - logGroupName: request.Table, - } - - response, err := p.resolver.resolve(params) - if err != nil { - return "", fmt.Errorf("resolve log group: %w", err) - } - - return path.Join(response.databaseName, response.tableName), nil -} - -func NewPrefixGetter(resolver Resolver) rdbms_ydb.PrefixGetter { - return &prefixGetter{resolver: resolver} -} diff --git a/app/server/datasource/rdbms/logging/resolver.go b/app/server/datasource/rdbms/logging/resolver.go index 7dfb3f0a..9c89ed6b 100644 --- a/app/server/datasource/rdbms/logging/resolver.go +++ b/app/server/datasource/rdbms/logging/resolver.go @@ -3,7 +3,6 @@ package logging import ( "context" "fmt" - "math/rand" "go.uber.org/zap" @@ -16,15 +15,16 @@ type resolveParams struct { logger *zap.Logger folderId string logGroupName string + iamToken string // optional, used for authorization into external APIs } -type resolveResponse struct { +type ydbSource struct { endpoint *api_common.TGenericEndpoint databaseName string tableName string } -func (r *resolveResponse) ToZapFields() []zap.Field { +func (r *ydbSource) ToZapFields() []zap.Field { return []zap.Field{ zap.String("host", r.endpoint.Host), zap.Uint32("port", r.endpoint.Port), @@ -33,58 +33,22 @@ func (r *resolveResponse) ToZapFields() []zap.Field { } } -type Resolver interface { - resolve(request *resolveParams) (*resolveResponse, error) -} - -type staticResolver struct { - cfg *config.TLoggingConfig_TStaticResolving -} - -func (r *staticResolver) resolve( - request *resolveParams, -) (*resolveResponse, error) { - if len(r.cfg.Databases) == 0 { - return nil, fmt.Errorf("no YDB endpoints provided") - } - - // get random YDB endpoint from provided list - ix := rand.Intn(len(r.cfg.Databases)) - - endpoint := r.cfg.Databases[ix].Endpoint - databaseName := r.cfg.Databases[ix].Name - - // pick a preconfigured folder - folder, exists := r.cfg.Folders[request.folderId] - if !exists { - return nil, fmt.Errorf("folder_id '%s' is missing", request.folderId) - } - - // resolve log group name into log group id - logGroupId, exists := folder.LogGroups[request.logGroupName] - if !exists { - return nil, fmt.Errorf("log group '%s' is missing", request.logGroupName) - } - - tableName := fmt.Sprintf("logs/origin/yc.logs.cloud/%s/%s", request.folderId, logGroupId) - - return &resolveResponse{ - endpoint: endpoint, - tableName: tableName, - databaseName: databaseName, - }, nil +type resolveResponse struct { + sources []*ydbSource } -func newStaticResolver(cfg *config.TLoggingConfig_TStaticResolving) Resolver { - return &staticResolver{ - cfg: cfg, - } +type Resolver interface { + resolve(request *resolveParams) (*resolveResponse, error) + Close() error } func NewResolver(cfg *config.TLoggingConfig) (Resolver, error) { - if cfg.GetStatic() != nil { - return newStaticResolver(cfg.GetStatic()), nil + switch cfg.GetResolving().(type) { + case *config.TLoggingConfig_Static: + return newResolverStatic(cfg.GetStatic()), nil + case *config.TLoggingConfig_Dynamic: + return newResolverDynamic(cfg) + default: + return nil, fmt.Errorf("unsupported resolver type: %T", cfg.GetResolving()) } - - return nil, fmt.Errorf("unsupported resolver type: %T", cfg.GetResolving()) } diff --git a/app/server/datasource/rdbms/logging/resolver_dynamic.go b/app/server/datasource/rdbms/logging/resolver_dynamic.go new file mode 100644 index 00000000..184fba9b --- /dev/null +++ b/app/server/datasource/rdbms/logging/resolver_dynamic.go @@ -0,0 +1,81 @@ +package logging + +import ( + "crypto/tls" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + + api_logging "github.com/ydb-platform/fq-connector-go/api/logging/v1" + "github.com/ydb-platform/fq-connector-go/app/config" + "github.com/ydb-platform/fq-connector-go/common" +) + +type dynamicResolver struct { + client api_logging.LogGroupServiceClient + conn *grpc.ClientConn +} + +func (r *dynamicResolver) resolve( + request *resolveParams, +) (*resolveResponse, error) { + if request.iamToken == "" { + return nil, fmt.Errorf("IAM token is missing") + } + + md := metadata.Pairs("authorization", fmt.Sprintf("Bearer %s", request.iamToken)) + ctx := metadata.NewOutgoingContext(request.ctx, md) + + response, err := r.client.GetReadingEndpoint(ctx, &api_logging.GetReadingEndpointRequest{ + FolderId: request.folderId, + GroupName: request.logGroupName, + }) + + if err != nil { + return nil, fmt.Errorf("get reading endpoint: %w", err) + } + + var sources []*ydbSource + + for _, table := range response.GetTables() { + endpoint, err := common.StringToEndpoint(table.GetDbEndpoint()) + if err != nil { + return nil, fmt.Errorf("string '%s' to endpoint: %w", table.GetDbEndpoint(), err) + } + + sources = append(sources, &ydbSource{ + endpoint: endpoint, + databaseName: table.DbName, + tableName: table.TableName, + }) + } + + return &resolveResponse{ + sources: sources, + }, nil +} + +func (r *dynamicResolver) Close() error { + return r.conn.Close() +} + +func newResolverDynamic(cfg *config.TLoggingConfig) (Resolver, error) { + endpoint := common.EndpointToString(cfg.GetDynamic().LoggingEndpoint) + + // TODO: configure this in future + tlsCfg := &tls.Config{ + InsecureSkipVerify: true, + } + + grpcConn, err := grpc.Dial(endpoint, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))) + if err != nil { + return nil, fmt.Errorf("GRPC dial: %w", err) + } + + return &dynamicResolver{ + client: api_logging.NewLogGroupServiceClient(grpcConn), + conn: grpcConn, + }, nil +} diff --git a/app/server/datasource/rdbms/logging/resolver_static.go b/app/server/datasource/rdbms/logging/resolver_static.go new file mode 100644 index 00000000..9e3705db --- /dev/null +++ b/app/server/datasource/rdbms/logging/resolver_static.go @@ -0,0 +1,57 @@ +package logging + +import ( + "fmt" + "math/rand" + + "github.com/ydb-platform/fq-connector-go/app/config" +) + +type staticResolver struct { + cfg *config.TLoggingConfig_TStaticResolving +} + +func (r *staticResolver) resolve(request *resolveParams) (*resolveResponse, error) { + if len(r.cfg.Databases) == 0 { + return nil, fmt.Errorf("no YDB endpoints provided") + } + + // get random YDB endpoint from provided list + ix := rand.Intn(len(r.cfg.Databases)) + + endpoint := r.cfg.Databases[ix].Endpoint + databaseName := r.cfg.Databases[ix].Name + + // pick a preconfigured folder + folder, exists := r.cfg.Folders[request.folderId] + if !exists { + return nil, fmt.Errorf("folder_id '%s' is missing", request.folderId) + } + + // resolve log group name into log group id + logGroupId, exists := folder.LogGroups[request.logGroupName] + if !exists { + return nil, fmt.Errorf("log group '%s' is missing", request.logGroupName) + } + + // FIXME: hardcoded cloud name is a mistake + tableName := fmt.Sprintf("logs/origin/yc.logs.cloud/%s/%s", request.folderId, logGroupId) + + return &resolveResponse{ + sources: []*ydbSource{ + { + endpoint: endpoint, + tableName: tableName, + databaseName: databaseName, + }, + }, + }, nil +} + +func (staticResolver) Close() error { return nil } + +func newResolverStatic(cfg *config.TLoggingConfig_TStaticResolving) Resolver { + return &staticResolver{ + cfg: cfg, + } +} diff --git a/app/server/datasource/rdbms/logging/sql_formatter.go b/app/server/datasource/rdbms/logging/sql_formatter.go deleted file mode 100644 index 8c08f72d..00000000 --- a/app/server/datasource/rdbms/logging/sql_formatter.go +++ /dev/null @@ -1,43 +0,0 @@ -package logging - -import ( - "fmt" - - "github.com/ydb-platform/fq-connector-go/app/config" - rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" - "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/ydb" -) - -var _ rdbms_utils.SQLFormatter = (*sqlFormatter)(nil) - -type sqlFormatter struct { - rdbms_utils.SQLFormatter - resolver Resolver -} - -func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) { - request := &resolveParams{ - ctx: params.Ctx, - logger: params.Logger, - folderId: params.DataSourceInstance.GetLoggingOptions().FolderId, - logGroupName: params.TableName, - } - - response, err := f.resolver.resolve(request) - if err != nil { - return "", fmt.Errorf("resolve YDB table: %w", err) - } - - return f.SanitiseIdentifier(response.tableName), nil -} - -func NewSQLFormatter(resolver Resolver, mode config.TYdbConfig_Mode) rdbms_utils.SQLFormatter { - ydbFormatter := ydb.NewSQLFormatter(mode) - - formatter := &sqlFormatter{ - SQLFormatter: ydbFormatter, - resolver: resolver, - } - - return formatter -} diff --git a/app/server/datasource/rdbms/ms_sql_server/connection.go b/app/server/datasource/rdbms/ms_sql_server/connection.go index 56e772a7..866304dc 100644 --- a/app/server/datasource/rdbms/ms_sql_server/connection.go +++ b/app/server/datasource/rdbms/ms_sql_server/connection.go @@ -12,15 +12,21 @@ import ( var _ rdbms_utils.Connection = (*Connection)(nil) type Connection struct { - db *sql.DB - logger common.QueryLogger + db *sql.DB + logger common.QueryLogger + databaseName string + tableName string } -func (c Connection) Close() error { +func (c *Connection) Close() error { return c.db.Close() } -func (c Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { +func (c *Connection) From() (databaseName, tableName string) { + return c.databaseName, c.tableName +} + +func (c *Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) out, err := c.db.QueryContext(params.Ctx, params.QueryText, params.QueryArgs.Values()...) diff --git a/app/server/datasource/rdbms/ms_sql_server/connection_manager.go b/app/server/datasource/rdbms/ms_sql_server/connection_manager.go index 3c5fc2a5..f7b88162 100644 --- a/app/server/datasource/rdbms/ms_sql_server/connection_manager.go +++ b/app/server/datasource/rdbms/ms_sql_server/connection_manager.go @@ -22,8 +22,8 @@ type connectionManager struct { } func (c *connectionManager) Make( - params *rdbms_utils.ConnectionParamsMakeParams, -) (rdbms_utils.Connection, error) { + params *rdbms_utils.ConnectionManagerMakeParams, +) ([]rdbms_utils.Connection, error) { dsi, ctx, logger := params.DataSourceInstance, params.Ctx, params.Logger if dsi.Protocol != api_common.EGenericProtocol_NATIVE { @@ -63,11 +63,13 @@ func (c *connectionManager) Make( queryLogger := c.QueryLoggerFactory.Make(logger) - return &Connection{db, queryLogger}, nil + return []rdbms_utils.Connection{&Connection{db, queryLogger, params.DataSourceInstance.Database, params.TableName}}, nil } -func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) { - common.LogCloserError(logger, conn, "close connection") +func (*connectionManager) Release(_ context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) { + for _, conn := range cs { + common.LogCloserError(logger, conn, "close connection") + } } func NewConnectionManager( diff --git a/app/server/datasource/rdbms/ms_sql_server/sql_formatter.go b/app/server/datasource/rdbms/ms_sql_server/sql_formatter.go index 8aaca108..4a528264 100644 --- a/app/server/datasource/rdbms/ms_sql_server/sql_formatter.go +++ b/app/server/datasource/rdbms/ms_sql_server/sql_formatter.go @@ -82,8 +82,8 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return sanitizedIdent } -func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) { - return f.SanitiseIdentifier(params.TableName), nil +func (f sqlFormatter) FormatFrom(_, tableName string) string { + return f.SanitiseIdentifier(tableName) } func NewSQLFormatter() rdbms_utils.SQLFormatter { diff --git a/app/server/datasource/rdbms/mysql/connection.go b/app/server/datasource/rdbms/mysql/connection.go index 99d1a4f1..5fa7808f 100644 --- a/app/server/datasource/rdbms/mysql/connection.go +++ b/app/server/datasource/rdbms/mysql/connection.go @@ -12,19 +12,21 @@ import ( "github.com/ydb-platform/fq-connector-go/common" ) -var _ rdbms_utils.Connection = (*Connection)(nil) - -type Connection struct { - logger common.QueryLogger - conn *client.Conn - cfg *config.TMySQLConfig +var _ rdbms_utils.Connection = (*connection)(nil) + +type connection struct { + logger common.QueryLogger + conn *client.Conn + cfg *config.TMySQLConfig + databaseName string + tableName string } -func (c *Connection) Close() error { +func (c *connection) Close() error { return c.conn.Close() } -func (c *Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { +func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) results := make(chan rowData, c.cfg.ResultChanCapacity) @@ -89,3 +91,7 @@ func (c *Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, e return r, nil } + +func (c *connection) From() (databaseName, tableName string) { + return c.databaseName, c.tableName +} diff --git a/app/server/datasource/rdbms/mysql/connection_manager.go b/app/server/datasource/rdbms/mysql/connection_manager.go index 094b07f4..0730dc05 100644 --- a/app/server/datasource/rdbms/mysql/connection_manager.go +++ b/app/server/datasource/rdbms/mysql/connection_manager.go @@ -25,8 +25,8 @@ type connectionManager struct { } func (c *connectionManager) Make( - params *rdbms_utils.ConnectionParamsMakeParams, -) (rdbms_utils.Connection, error) { + params *rdbms_utils.ConnectionManagerMakeParams, +) ([]rdbms_utils.Connection, error) { dsi, ctx, logger := params.DataSourceInstance, params.Ctx, params.Logger optionFuncs := make([]func(c *client.Conn), 0) @@ -82,11 +82,13 @@ func (c *connectionManager) Make( return nil, fmt.Errorf("set time zone: %w", err) } - return &Connection{queryLogger, conn, c.cfg}, nil + return []rdbms_utils.Connection{&connection{queryLogger, conn, c.cfg, dsi.Database, params.TableName}}, nil } -func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) { - common.LogCloserError(logger, conn, "close mysql connection") +func (*connectionManager) Release(_ context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) { + for _, cs := range cs { + common.LogCloserError(logger, cs, "close connection") + } } func NewConnectionManager(cfg *config.TMySQLConfig, base rdbms_utils.ConnectionManagerBase) rdbms_utils.ConnectionManager { diff --git a/app/server/datasource/rdbms/mysql/sql_formatter.go b/app/server/datasource/rdbms/mysql/sql_formatter.go index 5e93f5d9..e5be24c7 100644 --- a/app/server/datasource/rdbms/mysql/sql_formatter.go +++ b/app/server/datasource/rdbms/mysql/sql_formatter.go @@ -78,8 +78,8 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return fmt.Sprintf("`%s`", strings.Replace(ident, "`", "``", -1)) } -func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) { - return f.SanitiseIdentifier(params.TableName), nil +func (f sqlFormatter) FormatFrom(_, tableName string) string { + return f.SanitiseIdentifier(tableName) } func NewSQLFormatter() rdbms_utils.SQLFormatter { diff --git a/app/server/datasource/rdbms/oracle/connection.go b/app/server/datasource/rdbms/oracle/connection.go index f6f6dda1..e9e43fa1 100644 --- a/app/server/datasource/rdbms/oracle/connection.go +++ b/app/server/datasource/rdbms/oracle/connection.go @@ -10,18 +10,20 @@ import ( "github.com/ydb-platform/fq-connector-go/common" ) -var _ rdbms_utils.Connection = (*Connection)(nil) +var _ rdbms_utils.Connection = (*connection)(nil) -type Connection struct { - conn *go_ora.Connection - logger common.QueryLogger +type connection struct { + conn *go_ora.Connection + logger common.QueryLogger + databaseName string + tableName string } -func (c Connection) Close() error { +func (c *connection) Close() error { return c.conn.Close() } -func (c Connection) Query(queryParams *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { +func (c *connection) Query(queryParams *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { c.logger.Dump(queryParams.QueryText, queryParams.QueryArgs.Values()...) valueArgs := make([]driver.NamedValue, queryParams.QueryArgs.Count()) @@ -44,3 +46,7 @@ func (c Connection) Query(queryParams *rdbms_utils.QueryParams) (rdbms_utils.Row return rows, nil } + +func (c *connection) From() (databaseName, tableName string) { + return c.databaseName, c.tableName +} diff --git a/app/server/datasource/rdbms/oracle/connection_manager.go b/app/server/datasource/rdbms/oracle/connection_manager.go index 5ae03a3f..5d7e6fc2 100644 --- a/app/server/datasource/rdbms/oracle/connection_manager.go +++ b/app/server/datasource/rdbms/oracle/connection_manager.go @@ -22,8 +22,8 @@ type connectionManager struct { } func (c *connectionManager) Make( - params *rdbms_utils.ConnectionParamsMakeParams, -) (rdbms_utils.Connection, error) { + params *rdbms_utils.ConnectionManagerMakeParams, +) ([]rdbms_utils.Connection, error) { dsi, ctx, logger := params.DataSourceInstance, params.Ctx, params.Logger if dsi.Protocol != api_common.EGenericProtocol_NATIVE { return nil, fmt.Errorf("can not create Oracle connection with protocol '%v'", dsi.Protocol) @@ -74,11 +74,13 @@ func (c *connectionManager) Make( queryLogger := c.QueryLoggerFactory.Make(logger) - return &Connection{conn, queryLogger}, nil + return []rdbms_utils.Connection{&connection{conn, queryLogger, params.DataSourceInstance.Database, params.TableName}}, nil } -func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) { - common.LogCloserError(logger, conn, "close Oracle connection") +func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn []rdbms_utils.Connection) { + for _, c := range conn { + common.LogCloserError(logger, c, "close Oracle connection") + } } func NewConnectionManager( diff --git a/app/server/datasource/rdbms/oracle/sql_formatter.go b/app/server/datasource/rdbms/oracle/sql_formatter.go index 9de075a3..0ba72b10 100644 --- a/app/server/datasource/rdbms/oracle/sql_formatter.go +++ b/app/server/datasource/rdbms/oracle/sql_formatter.go @@ -83,8 +83,8 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return sanitizedIdent } -func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) { - return f.SanitiseIdentifier(params.TableName), nil +func (f sqlFormatter) FormatFrom(_, tableName string) string { + return f.SanitiseIdentifier(tableName) } func NewSQLFormatter() rdbms_utils.SQLFormatter { diff --git a/app/server/datasource/rdbms/postgresql/connection_manager.go b/app/server/datasource/rdbms/postgresql/connection_manager.go index cab9deee..f682f0a7 100644 --- a/app/server/datasource/rdbms/postgresql/connection_manager.go +++ b/app/server/datasource/rdbms/postgresql/connection_manager.go @@ -17,7 +17,7 @@ import ( "github.com/ydb-platform/fq-connector-go/common" ) -var _ rdbms_utils.Connection = (*Connection)(nil) +var _ rdbms_utils.Connection = (*connection)(nil) type rows struct { pgx.Rows @@ -44,16 +44,18 @@ func (r rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (p return transformerFromOIDs(oids, ydbTypes, cc) } -type Connection struct { +type connection struct { *pgx.Conn - logger common.QueryLogger + logger common.QueryLogger + databaseName string + tableName string } -func (c Connection) Close() error { +func (c *connection) Close() error { return c.Conn.Close(context.TODO()) } -func (c Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { +func (c *connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { c.logger.Dump(params.QueryText, params.QueryArgs.Values()...) out, err := c.Conn.Query(params.Ctx, params.QueryText, params.QueryArgs.Values()...) @@ -64,6 +66,10 @@ func (c Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, er return rows{Rows: out}, nil } +func (c *connection) From() (databaseName, tableName string) { + return c.databaseName, c.tableName +} + var _ rdbms_utils.ConnectionManager = (*connectionManager)(nil) type connectionManager struct { @@ -73,8 +79,8 @@ type connectionManager struct { } func (c *connectionManager) Make( - params *rdbms_utils.ConnectionParamsMakeParams, -) (rdbms_utils.Connection, error) { + params *rdbms_utils.ConnectionManagerMakeParams, +) ([]rdbms_utils.Connection, error) { dsi, ctx, logger := params.DataSourceInstance, params.Ctx, params.Logger if dsi.GetCredentials().GetBasic() == nil { return nil, fmt.Errorf("currently only basic auth is supported") @@ -128,15 +134,17 @@ func (c *connectionManager) Make( queryLogger := c.QueryLoggerFactory.Make(logger) - return &Connection{conn, queryLogger}, nil + return []rdbms_utils.Connection{&connection{conn, queryLogger, dsi.Database, params.TableName}}, nil } -func (*connectionManager) Release(ctx context.Context, logger *zap.Logger, conn rdbms_utils.Connection) { - if err := conn.(*Connection).Conn.DeallocateAll(ctx); err != nil { - logger.Error("deallocate prepared statements", zap.Error(err)) - } +func (*connectionManager) Release(ctx context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) { + for _, conn := range cs { + if err := conn.(*connection).Conn.DeallocateAll(ctx); err != nil { + logger.Error("deallocate prepared statements", zap.Error(err)) + } - common.LogCloserError(logger, conn, "close connection") + common.LogCloserError(logger, conn, "close connection") + } } type ConnectionManagerConfig interface { diff --git a/app/server/datasource/rdbms/postgresql/sql_formatter.go b/app/server/datasource/rdbms/postgresql/sql_formatter.go index 85ed1ab7..6991ae8f 100644 --- a/app/server/datasource/rdbms/postgresql/sql_formatter.go +++ b/app/server/datasource/rdbms/postgresql/sql_formatter.go @@ -86,8 +86,8 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return sanitizedIdent } -func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) { - return f.SanitiseIdentifier(params.TableName), nil +func (f sqlFormatter) FormatFrom(_, tableName string) string { + return f.SanitiseIdentifier(tableName) } func NewSQLFormatter() rdbms_utils.SQLFormatter { diff --git a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go index 6386ff3d..4c76b956 100644 --- a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go +++ b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go @@ -456,7 +456,10 @@ func TestMakeReadSplitsQuery(t *testing.T) { logger, formatter, tc.selectReq, - api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL) + api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL, + "", + tc.selectReq.From.Table, + ) if tc.err != nil { require.True(t, errors.Is(err, tc.err)) return diff --git a/app/server/datasource/rdbms/utils/query_builder.go b/app/server/datasource/rdbms/utils/query_builder.go index a2f61c36..e222ed89 100644 --- a/app/server/datasource/rdbms/utils/query_builder.go +++ b/app/server/datasource/rdbms/utils/query_builder.go @@ -21,14 +21,13 @@ func MakeReadSplitsQuery( formatter SQLFormatter, slct *api_service_protos.TSelect, filtering api_service_protos.TReadSplitsRequest_EFiltering, + databaseName, tableName string, ) (*ReadSplitsQuery, error) { selectPart, newSelectWhat, err := formatSelectHead( - ctx, - logger, formatter, slct.GetWhat(), - slct.GetFrom().GetTable(), - slct.DataSourceInstance, + databaseName, + tableName, true, ) diff --git a/app/server/datasource/rdbms/utils/select_helpers.go b/app/server/datasource/rdbms/utils/select_helpers.go index ea6a2568..c669c40a 100644 --- a/app/server/datasource/rdbms/utils/select_helpers.go +++ b/app/server/datasource/rdbms/utils/select_helpers.go @@ -1,14 +1,11 @@ package utils import ( - "context" "fmt" "strings" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - "go.uber.org/zap" - api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" "github.com/ydb-platform/fq-connector-go/common" ) @@ -44,12 +41,9 @@ func makeTSelectTWhatForEmptyColumnsRequest() *api_service_protos.TSelect_TWhat } func formatSelectHead( - ctx context.Context, - logger *zap.Logger, formatter SQLFormatter, selectWhat *api_service_protos.TSelect_TWhat, - tableName string, - dsi *api_common.TGenericDataSourceInstance, + databaseName, tableName string, fakeZeroOnEmptyColumnsSet bool, ) (string, *api_service_protos.TSelect_TWhat, error) { // SELECT $columns FROM $from @@ -94,17 +88,7 @@ func formatSelectHead( sb.WriteString(" FROM ") - params := &SQLFormatterFormatFromParams{ - Ctx: ctx, - Logger: logger, - TableName: tableName, - DataSourceInstance: dsi, - } - - from, err := formatter.FormatFrom(params) - if err != nil { - return "", nil, fmt.Errorf("format FROM: %w", err) - } + from := formatter.FormatFrom(databaseName, tableName) sb.WriteString(from) diff --git a/app/server/datasource/rdbms/utils/sql.go b/app/server/datasource/rdbms/utils/sql.go index 39a0b0d6..4e0ec231 100644 --- a/app/server/datasource/rdbms/utils/sql.go +++ b/app/server/datasource/rdbms/utils/sql.go @@ -21,7 +21,14 @@ type QueryParams struct { } type Connection interface { + // Query runs a query on a specific connection. Query(params *QueryParams) (Rows, error) + // For the most of the data sources the database name / table name pair + // is strictly defined by the user input. + // However, in certain kinds of data sources it's necessary + // to override database / table names specified by the user request. + From() (database, table string) + // Close terminates network connections. Close() error } @@ -34,29 +41,28 @@ type Rows interface { MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) } -type ConnectionParamsMakeParams struct { +type ConnectionManagerMakeParams struct { Ctx context.Context // mandatory Logger *zap.Logger // mandatory DataSourceInstance *api_common.TGenericDataSourceInstance // mandatory - TableName string // optional + TableName string // mandatory + + // MaxConnections is the maximum number of connections to make. + // Even if there are a plenty of physical instances of a data source, + // only requested number of connections will be made. + // Zero value means no limit. + MaxConnections int // optional } type ConnectionManager interface { - Make(params *ConnectionParamsMakeParams) (Connection, error) - Release(ctx context.Context, logger *zap.Logger, connection Connection) + Make(params *ConnectionManagerMakeParams) ([]Connection, error) + Release(ctx context.Context, logger *zap.Logger, cs []Connection) } type ConnectionManagerBase struct { QueryLoggerFactory common.QueryLoggerFactory } -type SQLFormatterFormatFromParams struct { - Ctx context.Context - Logger *zap.Logger - TableName string - DataSourceInstance *api_common.TGenericDataSourceInstance -} - type SQLFormatter interface { // Get placeholder for n'th argument (starting from 0) for prepared statement GetPlaceholder(n int) string @@ -69,8 +75,7 @@ type SQLFormatter interface { // FormatFrom builds a substring containing the literals // that must be placed after FROM (`SELECT ... FROM `). - // For some datasources this call may involve queries to external APIs. - FormatFrom(params *SQLFormatterFormatFromParams) (string, error) + FormatFrom(databaseName, tableName string) string } type SchemaProvider interface { diff --git a/app/server/datasource/rdbms/utils/sql_mock.go b/app/server/datasource/rdbms/utils/sql_mock.go index 29cd32f3..5378e611 100644 --- a/app/server/datasource/rdbms/utils/sql_mock.go +++ b/app/server/datasource/rdbms/utils/sql_mock.go @@ -30,20 +30,25 @@ func (m *ConnectionMock) Close() error { return m.Called().Error(0) } +func (m *ConnectionMock) From() (databaseName, tableName string) { + args := m.Called() + return args.String(0), args.String(1) +} + type ConnectionManagerMock struct { mock.Mock } func (m *ConnectionManagerMock) Make( - params *ConnectionParamsMakeParams, -) (Connection, error) { + params *ConnectionManagerMakeParams, +) ([]Connection, error) { args := m.Called(params.DataSourceInstance) - return args.Get(0).(Connection), args.Error(1) + return args.Get(0).([]Connection), args.Error(1) } -func (m *ConnectionManagerMock) Release(_ context.Context, _ *zap.Logger, conn Connection) { - m.Called(conn) +func (m *ConnectionManagerMock) Release(_ context.Context, _ *zap.Logger, cs []Connection) { + m.Called(cs) } var _ Rows = (*RowsMock)(nil) diff --git a/app/server/datasource/rdbms/ydb/connection_database_sql.go b/app/server/datasource/rdbms/ydb/connection_database_sql.go index eb875a38..43752beb 100644 --- a/app/server/datasource/rdbms/ydb/connection_database_sql.go +++ b/app/server/datasource/rdbms/ydb/connection_database_sql.go @@ -45,8 +45,10 @@ var _ rdbms_utils.Connection = (*connectionDatabaseSQL)(nil) type connectionDatabaseSQL struct { *sql.DB - driver *ydb_sdk.Driver - logger common.QueryLogger + driver *ydb_sdk.Driver + logger common.QueryLogger + dataabaseName string + tableName string } func (c *connectionDatabaseSQL) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) { @@ -77,6 +79,10 @@ func (c *connectionDatabaseSQL) getDriver() *ydb_sdk.Driver { return c.driver } +func (c *connectionDatabaseSQL) From() (databaseName, tableName string) { + return c.dataabaseName, c.tableName +} + func (c *connectionDatabaseSQL) Close() error { err1 := c.DB.Close() @@ -98,6 +104,7 @@ func newConnectionDatabaseSQL( queryLogger common.QueryLogger, cfg *config.TYdbConfig, dsi *api_common.TGenericDataSourceInstance, + tableName string, ydbDriver *ydb_sdk.Driver, ) (ydbConnection, error) { ydbConn, err := ydb_sdk.Connector( @@ -123,5 +130,5 @@ func newConnectionDatabaseSQL( return nil, fmt.Errorf("conn ping: %w", err) } - return &connectionDatabaseSQL{DB: conn, driver: ydbDriver, logger: queryLogger}, nil + return &connectionDatabaseSQL{DB: conn, driver: ydbDriver, logger: queryLogger, dataabaseName: dsi.Database, tableName: tableName}, nil } diff --git a/app/server/datasource/rdbms/ydb/connection_manager.go b/app/server/datasource/rdbms/ydb/connection_manager.go index 0c4bbc18..45e082da 100644 --- a/app/server/datasource/rdbms/ydb/connection_manager.go +++ b/app/server/datasource/rdbms/ydb/connection_manager.go @@ -6,9 +6,9 @@ import ( "strings" ydb_sdk "github.com/ydb-platform/ydb-go-sdk/v3" - "github.com/ydb-platform/ydb-go-sdk/v3/balancers" + ydb_balancers "github.com/ydb-platform/ydb-go-sdk/v3/balancers" ydb_sdk_config "github.com/ydb-platform/ydb-go-sdk/v3/config" - "github.com/ydb-platform/ydb-go-sdk/v3/sugar" + ydb_sugar "github.com/ydb-platform/ydb-go-sdk/v3/sugar" yc "github.com/ydb-platform/ydb-go-yc" "go.uber.org/zap" "google.golang.org/grpc" @@ -32,11 +32,11 @@ type connectionManager struct { } func (c *connectionManager) Make( - params *rdbms_utils.ConnectionParamsMakeParams, -) (rdbms_utils.Connection, error) { + params *rdbms_utils.ConnectionManagerMakeParams, +) ([]rdbms_utils.Connection, error) { dsi, ctx, logger := params.DataSourceInstance, params.Ctx, params.Logger endpoint := common.EndpointToString(dsi.Endpoint) - dsn := sugar.DSN(endpoint, dsi.Database, sugar.WithSecure(dsi.UseTls)) + dsn := ydb_sugar.DSN(endpoint, dsi.Database, ydb_sugar.WithSecure(dsi.UseTls)) var cred ydb_sdk.Option @@ -72,7 +72,7 @@ func (c *connectionManager) Make( ydbOptions := []ydb_sdk.Option{ cred, ydb_sdk.WithDialTimeout(common.MustDurationFromString(c.cfg.OpenConnectionTimeout)), - ydb_sdk.WithBalancer(balancers.SingleConn()), // see YQ-3089 + ydb_sdk.WithBalancer(ydb_balancers.SingleConn()), // see YQ-3089 ydb_sdk.With(ydb_sdk_config.WithGrpcOptions(grpc.WithDisableServiceConfig())), } @@ -99,10 +99,10 @@ func (c *connectionManager) Make( fallthrough case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE: logger.Debug("connector will use Native SDK over Query Service") - ydbConn = newConnectionNative(ctx, c.QueryLoggerFactory.Make(logger), dsi, ydbDriver) + ydbConn = newConnectionNative(ctx, c.QueryLoggerFactory.Make(logger), dsi, params.TableName, ydbDriver) case config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES: logger.Debug("connector will use database/sql SDK with scan queries over Table Service") - ydbConn, err = newConnectionDatabaseSQL(ctx, logger, c.QueryLoggerFactory.Make(logger), c.cfg, dsi, ydbDriver) + ydbConn, err = newConnectionDatabaseSQL(ctx, logger, c.QueryLoggerFactory.Make(logger), c.cfg, dsi, params.TableName, ydbDriver) default: return nil, fmt.Errorf("unknown mode: %v", c.cfg.Mode) } @@ -113,11 +113,13 @@ func (c *connectionManager) Make( logger.Debug("connection is ready") - return ydbConn, nil + return []rdbms_utils.Connection{ydbConn}, nil } -func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) { - common.LogCloserError(logger, conn, "close YDB connection") +func (*connectionManager) Release(_ context.Context, logger *zap.Logger, cs []rdbms_utils.Connection) { + for _, conn := range cs { + common.LogCloserError(logger, conn, "close YDB connection") + } } func NewConnectionManager( diff --git a/app/server/datasource/rdbms/ydb/connection_native.go b/app/server/datasource/rdbms/ydb/connection_native.go index b49a65fd..d7909653 100644 --- a/app/server/datasource/rdbms/ydb/connection_native.go +++ b/app/server/datasource/rdbms/ydb/connection_native.go @@ -113,6 +113,7 @@ type connectionNative struct { queryLogger common.QueryLogger ctx context.Context driver *ydb_sdk.Driver + tableName string } // nolint: gocyclo @@ -244,6 +245,10 @@ func (c *connectionNative) getDriver() *ydb_sdk.Driver { return c.driver } +func (c *connectionNative) From() (datbaseName, tableName string) { + return c.dsi.Database, c.tableName +} + func (c *connectionNative) Close() error { if err := c.driver.Close(c.ctx); err != nil { return fmt.Errorf("driver close: %w", err) @@ -294,6 +299,7 @@ func newConnectionNative( ctx context.Context, queryLogger common.QueryLogger, dsi *api_common.TGenericDataSourceInstance, + tableName string, driver *ydb_sdk.Driver, ) ydbConnection { return &connectionNative{ @@ -301,5 +307,6 @@ func newConnectionNative( driver: driver, queryLogger: queryLogger, dsi: dsi, + tableName: tableName, } } diff --git a/app/server/datasource/rdbms/ydb/prefix_getter.go b/app/server/datasource/rdbms/ydb/prefix_getter.go deleted file mode 100644 index 0ecde7c6..00000000 --- a/app/server/datasource/rdbms/ydb/prefix_getter.go +++ /dev/null @@ -1,34 +0,0 @@ -package ydb - -import ( - "context" - "path" - - "github.com/ydb-platform/ydb-go-sdk/v3" - "go.uber.org/zap" - - api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" -) - -type PrefixGetter interface { - GetPrefix( - ctx context.Context, - logger *zap.Logger, - driver *ydb.Driver, - request *api_service_protos.TDescribeTableRequest) (string, error) -} - -type prefixGetterImpl struct{} - -func (prefixGetterImpl) GetPrefix( - _ context.Context, - _ *zap.Logger, - db *ydb.Driver, - request *api_service_protos.TDescribeTableRequest, -) (string, error) { - return path.Join(db.Name(), request.Table), nil -} - -func NewPrefixGetter() PrefixGetter { - return prefixGetterImpl{} -} diff --git a/app/server/datasource/rdbms/ydb/schema_provider.go b/app/server/datasource/rdbms/ydb/schema_provider.go index bf45426b..0a5fec9e 100644 --- a/app/server/datasource/rdbms/ydb/schema_provider.go +++ b/app/server/datasource/rdbms/ydb/schema_provider.go @@ -3,6 +3,7 @@ package ydb import ( "context" "fmt" + "path" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" @@ -14,8 +15,7 @@ import ( ) type schemaProvider struct { - typeMapper datasource.TypeMapper - prefixGetter PrefixGetter + typeMapper datasource.TypeMapper } var _ rdbms_utils.SchemaProvider = (*schemaProvider)(nil) @@ -26,17 +26,17 @@ func (f *schemaProvider) GetSchema( conn rdbms_utils.Connection, request *api_service_protos.TDescribeTableRequest, ) (*api_service_protos.TSchema, error) { - db := conn.(ydbConnection).getDriver() - desc := options.Description{} + databaseName, tableName := conn.From() - prefix, err := f.prefixGetter.GetPrefix(ctx, logger, db, request) - if err != nil { - return nil, fmt.Errorf("get prefix: %w", err) - } + var ( + driver = conn.(ydbConnection).getDriver() + prefix = path.Join(databaseName, tableName) + desc options.Description + ) logger.Debug("obtaining table metadata", zap.String("prefix", prefix)) - err = db.Table().Do( + err := driver.Table().Do( ctx, func(ctx context.Context, s table.Session) error { var errInner error @@ -71,10 +71,8 @@ func (f *schemaProvider) GetSchema( func NewSchemaProvider( typeMapper datasource.TypeMapper, - preffixGetter PrefixGetter, ) rdbms_utils.SchemaProvider { return &schemaProvider{ - typeMapper: typeMapper, - prefixGetter: preffixGetter, + typeMapper: typeMapper, } } diff --git a/app/server/datasource/rdbms/ydb/sql_formatter.go b/app/server/datasource/rdbms/ydb/sql_formatter.go index efec2fcd..2e63d050 100644 --- a/app/server/datasource/rdbms/ydb/sql_formatter.go +++ b/app/server/datasource/rdbms/ydb/sql_formatter.go @@ -96,12 +96,12 @@ func (sqlFormatter) SanitiseIdentifier(ident string) string { return fmt.Sprintf("`%s`", ident) } -func (f sqlFormatter) FormatFrom(params *rdbms_utils.SQLFormatterFormatFromParams) (string, error) { +func (f sqlFormatter) FormatFrom(_, tableName string) string { // Trim leading slash, otherwise TablePathPrefix won't work. // See https://ydb.tech/docs/ru/yql/reference/syntax/pragma#table-path-prefix - tableName := strings.TrimPrefix(params.TableName, "/") + tableName = strings.TrimPrefix(tableName, "/") - return f.SanitiseIdentifier(tableName), nil + return f.SanitiseIdentifier(tableName) } func NewSQLFormatter(mode config.TYdbConfig_Mode) rdbms_utils.SQLFormatter { diff --git a/app/server/datasource/rdbms/ydb/sql_formatter_test.go b/app/server/datasource/rdbms/ydb/sql_formatter_test.go index 918c3302..f3661cf7 100644 --- a/app/server/datasource/rdbms/ydb/sql_formatter_test.go +++ b/app/server/datasource/rdbms/ydb/sql_formatter_test.go @@ -461,6 +461,8 @@ func TestMakeReadSplitsQuery(t *testing.T) { formatter, tc.selectReq, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL, + "", + tc.selectReq.From.Table, ) if tc.err != nil { require.True(t, errors.Is(err, tc.err)) diff --git a/app/server/datasource/s3/data_source.go b/app/server/datasource/s3/data_source.go index 9e5dd662..5650717d 100644 --- a/app/server/datasource/s3/data_source.go +++ b/app/server/datasource/s3/data_source.go @@ -38,19 +38,15 @@ func (ds *dataSource) ReadSplit( logger *zap.Logger, _ *api_service_protos.TReadSplitsRequest, split *api_service_protos.TSplit, - sink paging.Sink[string]) { - if err := ds.doReadSplit(ctx, logger, split, sink); err != nil { - sink.AddError(err) - } - - sink.Finish() + sinkFactory paging.SinkFactory[string]) error { + return ds.doReadSplit(ctx, logger, split, sinkFactory) } func (*dataSource) doReadSplit( ctx context.Context, _ *zap.Logger, split *api_service_protos.TSplit, - sink paging.Sink[string]) error { + sinkFactory paging.SinkFactory[string]) error { conn := makeConnection() var ( @@ -80,10 +76,19 @@ func (*dataSource) doReadSplit( csvReader := csv.NewReader(response.Body) + sinks, err := sinkFactory.MakeSinks(1) + if err != nil { + return fmt.Errorf("make sinks: %w", err) + } + + sink := sinks[0] + if err := transformCSV(split.Select.What, split.Select.PredefinedSchema, csvReader, sink); err != nil { return fmt.Errorf("transform csv: %w", err) } + sink.Finish() + return nil } diff --git a/app/server/paging/interface.go b/app/server/paging/interface.go index 2ad375df..8f25626e 100644 --- a/app/server/paging/interface.go +++ b/app/server/paging/interface.go @@ -31,14 +31,21 @@ type ReadResult[T Acceptor] struct { IsTerminalMessage bool } -// Sink is a destination for a data stream that is read out of an external data source. +// Sink is a destination for a data stream that is read out of an external data source connection. type Sink[T Acceptor] interface { // AddRow saves the row obtained from a stream incoming from an external data source. AddRow(rowTransformer RowTransformer[T]) error - // AddError propagates an error occurred during the reading from the external data source. - AddError(err error) - // Finish reports the successful completion of reading the data stream. + + // Finish reports the successful completion of data stream reading. Finish() - // ResultQueue returns a channel with results +} + +// SinkFactory should be instantiated once for each ReadSplits request. +// It owns some structures that are shared across multiple Sink instances. +type SinkFactory[T Acceptor] interface { + MakeSinks(totalSinks int) ([]Sink[T], error) + // ResultQueue returns a channel with columnar buffers generated by all sinks ResultQueue() <-chan *ReadResult[T] + // FinalStats returns the overall statistics collected during the request processing. + FinalStats() *api_service_protos.TReadSplitsResponse_TStats } diff --git a/app/server/paging/mock.go b/app/server/paging/mock.go index add86cab..e0f77cd4 100644 --- a/app/server/paging/mock.go +++ b/app/server/paging/mock.go @@ -30,6 +30,26 @@ func (m *SinkMock) ResultQueue() <-chan *ReadResult[any] { return m.Called().Get(0).(chan *ReadResult[any]) } +var _ SinkFactory[any] = (*SinkFactoryMock)(nil) + +type SinkFactoryMock struct { + mock.Mock +} + +func (m *SinkFactoryMock) MakeSinks(totalSinks int) ([]Sink[any], error) { + args := m.Called(totalSinks) + + return args.Get(0).([]Sink[any]), args.Error(1) +} + +func (m *SinkFactoryMock) ResultQueue() <-chan *ReadResult[any] { + return m.Called().Get(0).(chan *ReadResult[any]) +} + +func (m *SinkFactoryMock) FinalStats() *api_service_protos.TReadSplitsResponse_TStats { + return m.Called().Get(0).(*api_service_protos.TReadSplitsResponse_TStats) +} + var _ ColumnarBuffer[any] = (*ColumnarBufferMock)(nil) type ColumnarBufferMock struct { diff --git a/app/server/paging/sink.go b/app/server/paging/sink.go index a63606ae..f8a34237 100644 --- a/app/server/paging/sink.go +++ b/app/server/paging/sink.go @@ -14,9 +14,9 @@ import ( type sinkState int8 const ( - operational sinkState = iota + 1 - failed - finished + sinkOperational sinkState = iota + 1 + sinkFailed + sinkFinished ) var _ Sink[any] = (*sinkImpl[any])(nil) @@ -25,6 +25,7 @@ var _ Sink[string] = (*sinkImpl[string])(nil) type sinkImpl[T Acceptor] struct { currBuffer ColumnarBuffer[T] // accumulates incoming rows resultQueue chan *ReadResult[T] // outgoing buffer queue + terminateChan chan<- struct{} // notify factory when the data reading is finished bufferFactory ColumnarBufferFactory[T] // creates new buffer trafficTracker *TrafficTracker[T] // tracks the amount of data passed through the sink readLimiter ReadLimiter // helps to restrict the number of rows read in every request @@ -34,8 +35,8 @@ type sinkImpl[T Acceptor] struct { } func (s *sinkImpl[T]) AddRow(rowTransformer RowTransformer[T]) error { - if s.state != operational { - panic(s.unexpectedState(operational)) + if s.state != sinkOperational { + panic(s.unexpectedState(sinkOperational)) } if err := s.readLimiter.addRow(); err != nil { @@ -69,16 +70,6 @@ func (s *sinkImpl[T]) AddRow(rowTransformer RowTransformer[T]) error { return nil } -func (s *sinkImpl[T]) AddError(err error) { - if s.state != operational { - panic(s.unexpectedState(operational)) - } - - s.respondWith(nil, nil, err, true) - - s.state = failed -} - func (s *sinkImpl[T]) flush(makeNewBuffer bool, isTerminalMessage bool) error { if s.currBuffer.TotalRows() == 0 { return nil @@ -106,27 +97,26 @@ func (s *sinkImpl[T]) flush(makeNewBuffer bool, isTerminalMessage bool) error { } func (s *sinkImpl[T]) Finish() { - if s.state != operational && s.state != failed { - panic(s.unexpectedState(operational, failed)) + if s.state != sinkOperational && s.state != sinkFailed { + panic(s.unexpectedState(sinkOperational, sinkFailed)) } // if there is some data left, send it to the channel - if s.state == operational { + if s.state == sinkOperational { err := s.flush(false, true) if err != nil { s.respondWith(nil, nil, fmt.Errorf("flush: %w", err), true) - s.state = failed + s.state = sinkFailed } else { - s.state = finished + s.state = sinkFinished } } - // notify reader about the end of data - close(s.resultQueue) -} - -func (s *sinkImpl[T]) ResultQueue() <-chan *ReadResult[T] { - return s.resultQueue + // notify factory about the end of data + select { + case s.terminateChan <- struct{}{}: + case <-s.ctx.Done(): + } } func (s *sinkImpl[T]) respondWith( @@ -145,28 +135,3 @@ func (s *sinkImpl[T]) unexpectedState(expected ...sinkState) error { "unexpected state '%v' (expected are '%v'): %w", s.state, expected, common.ErrInvariantViolation) } - -func NewSink[T Acceptor]( - ctx context.Context, - logger *zap.Logger, - trafficTracker *TrafficTracker[T], - columnarBufferFactory ColumnarBufferFactory[T], - readLimiter ReadLimiter, - resultQueueCapacity int, -) (Sink[T], error) { - buffer, err := columnarBufferFactory.MakeBuffer() - if err != nil { - return nil, fmt.Errorf("wrap buffer: %w", err) - } - - return &sinkImpl[T]{ - bufferFactory: columnarBufferFactory, - readLimiter: readLimiter, - resultQueue: make(chan *ReadResult[T], resultQueueCapacity), - trafficTracker: trafficTracker, - currBuffer: buffer, - logger: logger, - state: operational, - ctx: ctx, - }, nil -} diff --git a/app/server/paging/sink_factory.go b/app/server/paging/sink_factory.go new file mode 100644 index 00000000..7cf5ff11 --- /dev/null +++ b/app/server/paging/sink_factory.go @@ -0,0 +1,152 @@ +package paging + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/config" +) + +type sinkFactoryState int8 + +const ( + sinkFactoryIdle sinkFactoryState = iota + 1 + sinkFactorySinksGenerated + sinkFactoryFailed + sinkFactoryFinished +) + +// SinkFactory should be instantiated once for each ReadSplits request. +// It owns some structures that are shared across multiple Sink instances. +type sinkFactoryImpl[T Acceptor] struct { + ctx context.Context + logger *zap.Logger + cfg *config.TPagingConfig + resultQueue chan *ReadResult[T] // outgoing buffer queue + bufferFactory ColumnarBufferFactory[T] // factory responsible for ColumnarBuffer generation + readLimiter ReadLimiter // helps to restrict the number of rows read in every request + state sinkFactoryState + totalSinks int + + // Every sink has own traffic tracker, but factory keeps all created trackers during its lifetime + // to provide overall traffic stats. + trafficTrackers []*TrafficTracker[T] +} + +// MakeSinks is used to generate Sink objects, one per each data source connection. +// This method can be called only once. +func (f *sinkFactoryImpl[T]) MakeSinks(totalSinks int) ([]Sink[T], error) { + if f.state != sinkFactoryIdle { + return nil, fmt.Errorf("sink factory is already in use") + } + + f.totalSinks = totalSinks + + result := make([]Sink[T], 0, totalSinks) + + // Children sinks will use this channel to notify factory when the read is completed. + terminateChan := make(chan struct{}, totalSinks) + + for i := 0; i < totalSinks; i++ { + buffer, err := f.bufferFactory.MakeBuffer() + if err != nil { + f.state = sinkFactoryFailed + return nil, fmt.Errorf("make buffer: %w", err) + } + + // preserve traffic tracker to obtain stats in future + trafficTracker := NewTrafficTracker[T](f.cfg) + f.trafficTrackers = append(f.trafficTrackers, trafficTracker) + + sink := &sinkImpl[T]{ + bufferFactory: f.bufferFactory, + readLimiter: f.readLimiter, + resultQueue: f.resultQueue, // result queue is shared across multiple Sink instances + terminateChan: terminateChan, + trafficTracker: trafficTracker, + currBuffer: buffer, + logger: f.logger, + state: sinkOperational, + ctx: f.ctx, + } + + result = append(result, sink) + } + + f.state = sinkFactorySinksGenerated + + // await for all the sinks to finish + go f.sinkTerminationHandler(terminateChan) + + return result, nil +} + +// ResultQueue returns a channel with columnar buffers generated by all sinks +func (f *sinkFactoryImpl[T]) ResultQueue() <-chan *ReadResult[T] { + return f.resultQueue +} + +// FinalStats returns the overall statistics collected during the request processing. +func (f *sinkFactoryImpl[T]) FinalStats() *api_service_protos.TReadSplitsResponse_TStats { + overallStats := &api_service_protos.TReadSplitsResponse_TStats{} + + for _, tracker := range f.trafficTrackers { + partialStats := tracker.DumpStats(true) + overallStats.Rows += partialStats.Rows + overallStats.Bytes += partialStats.Bytes + } + + return overallStats +} + +func (f *sinkFactoryImpl[T]) sinkTerminationHandler(terminateChan <-chan struct{}) { + terminatedSinks := 0 + + for { + select { + case <-terminateChan: + terminatedSinks++ + + f.logger.Debug( + "sink terminated", + zap.Int("total_sinks", f.totalSinks), + zap.Int("terminated_sinks", terminatedSinks), + ) + + if terminatedSinks == f.totalSinks { + // notify reader about the end of data + f.logger.Info("all sinks terminated") + close(f.resultQueue) + f.state = sinkFactoryFinished + + return + } + + case <-f.ctx.Done(): + return + } + } +} + +func NewSinkFactory[T Acceptor]( + ctx context.Context, + logger *zap.Logger, + cfg *config.TPagingConfig, + columnarBufferFactory ColumnarBufferFactory[T], + readLimiter ReadLimiter, +) SinkFactory[T] { + sf := &sinkFactoryImpl[T]{ + state: sinkFactoryIdle, + bufferFactory: columnarBufferFactory, + readLimiter: readLimiter, + resultQueue: make(chan *ReadResult[T], cfg.PrefetchQueueCapacity), + cfg: cfg, + ctx: ctx, + logger: logger, + } + + return sf +} diff --git a/app/server/paging/sink_string.go b/app/server/paging/sink_string.go index bea6becd..41928eab 100644 --- a/app/server/paging/sink_string.go +++ b/app/server/paging/sink_string.go @@ -8,14 +8,14 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} - _ = x[operational-1] - _ = x[failed-2] - _ = x[finished-3] + _ = x[sinkOperational-1] + _ = x[sinkFailed-2] + _ = x[sinkFinished-3] } -const _sinkState_name = "operationalfailedfinished" +const _sinkState_name = "sinkOperationalsinkFailedsinkFinished" -var _sinkState_index = [...]uint8{0, 11, 17, 25} +var _sinkState_index = [...]uint8{0, 15, 25, 37} func (i sinkState) String() string { i -= 1 diff --git a/app/server/service_connector.go b/app/server/service_connector.go index 8517ee28..dcd178b6 100644 --- a/app/server/service_connector.go +++ b/app/server/service_connector.go @@ -273,6 +273,7 @@ func makeGRPCOptions(logger *zap.Logger, cfg *config.TServerConfig, registry *so func (s *serviceConnector) stop() { s.grpcServer.GracefulStop() + common.LogCloserError(s.logger, s.dataSourceCollection, "closing data source collection") } func newServiceConnector( diff --git a/app/server/streaming/streamer.go b/app/server/streaming/streamer.go index 75a5c319..fe44c100 100644 --- a/app/server/streaming/streamer.go +++ b/app/server/streaming/streamer.go @@ -15,14 +15,15 @@ import ( ) type Streamer[T paging.Acceptor] struct { - stream api_service.Connector_ReadSplitsServer - dataSource datasource.DataSource[T] - request *api_service_protos.TReadSplitsRequest - split *api_service_protos.TSplit - sink paging.Sink[T] - logger *zap.Logger - ctx context.Context // clone of a stream context - cancel context.CancelFunc + stream api_service.Connector_ReadSplitsServer + dataSource datasource.DataSource[T] + request *api_service_protos.TReadSplitsRequest + split *api_service_protos.TSplit + sinkFactory paging.SinkFactory[T] + logger *zap.Logger + errorChan chan error // notifies about errors happened during reading process + ctx context.Context // clone of a stream context + cancel context.CancelFunc } func (s *Streamer[T]) writeDataToStream() error { @@ -31,7 +32,7 @@ func (s *Streamer[T]) writeDataToStream() error { for { select { - case result, ok := <-s.sink.ResultQueue(): + case result, ok := <-s.sinkFactory.ResultQueue(): if !ok { // correct termination return nil @@ -45,6 +46,11 @@ func (s *Streamer[T]) writeDataToStream() error { if err := s.sendResultToStream(result); err != nil { return fmt.Errorf("send buffer to stream: %w", err) } + case err := <-s.errorChan: + // an error occurred during reading + if err != nil { + return fmt.Errorf("error: %w", err) + } case <-s.stream.Context().Done(): // handle request termination return s.stream.Context().Err() @@ -82,7 +88,10 @@ func dumpReadSplitsResponse(logger *zap.Logger, resp *api_service_protos.TReadSp switch t := resp.GetPayload().(type) { case *api_service_protos.TReadSplitsResponse_ArrowIpcStreaming: if dump := resp.GetArrowIpcStreaming(); dump != nil { - logger.Debug("response", zap.Int("arrow_blob_length", len(dump))) + logger.Debug("response", + zap.Uint64("rows", resp.GetStats().Rows), + zap.Uint64("bytes", resp.GetStats().Bytes), + zap.Int("arrow_blob_size", len(dump))) } case *api_service_protos.TReadSplitsResponse_ColumnSet: for i := range t.ColumnSet.Data { @@ -103,14 +112,17 @@ func (s *Streamer[T]) Run() error { defer wg.Wait() // Launch reading from the data source. - // Subsriber goroutine controls publisher goroutine lifetime. + // Subscriber goroutine controls publisher goroutine lifetime. go func() { defer wg.Done() - s.dataSource.ReadSplit(s.ctx, s.logger, s.request, s.split, s.sink) + select { + case s.errorChan <- s.dataSource.ReadSplit(s.ctx, s.logger, s.request, s.split, s.sinkFactory): + case <-s.ctx.Done(): + } }() - // pass received blocks into the GRPC channel + // Pass received blocks into the GRPC channel if err := s.writeDataToStream(); err != nil { return fmt.Errorf("write data to stream: %w", err) } @@ -123,19 +135,20 @@ func NewStreamer[T paging.Acceptor]( stream api_service.Connector_ReadSplitsServer, request *api_service_protos.TReadSplitsRequest, split *api_service_protos.TSplit, - sink paging.Sink[T], + sinkFactory paging.SinkFactory[T], dataSource datasource.DataSource[T], ) *Streamer[T] { ctx, cancel := context.WithCancel(stream.Context()) return &Streamer[T]{ - logger: logger, - request: request, - stream: stream, - split: split, - dataSource: dataSource, - sink: sink, - ctx: ctx, - cancel: cancel, + logger: logger, + request: request, + stream: stream, + split: split, + dataSource: dataSource, + sinkFactory: sinkFactory, + errorChan: make(chan error), + ctx: ctx, + cancel: cancel, } } diff --git a/app/server/streaming/streamer_test.go b/app/server/streaming/streamer_test.go index f6e41804..623906f9 100644 --- a/app/server/streaming/streamer_test.go +++ b/app/server/streaming/streamer_test.go @@ -155,10 +155,11 @@ func (tc testCaseStreaming) execute(t *testing.T) { stream.On("Context").Return(ctx) connection := &rdbms_utils.ConnectionMock{} + connection.On("From").Return("", "example_1").Once() connectionManager := &rdbms_utils.ConnectionManagerMock{} - connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once() - connectionManager.On("Release", connection).Return().Once() + connectionManager.On("Make", split.Select.DataSourceInstance).Return([]rdbms_utils.Connection{connection}, nil).Once() + connectionManager.On("Release", []rdbms_utils.Connection{connection}).Return().Once() rows := &rdbms_utils.RowsMock{ PredefinedData: tc.src, @@ -252,20 +253,13 @@ func (tc testCaseStreaming) execute(t *testing.T) { require.NoError(t, err) pagingCfg := &config.TPagingConfig{RowsPerPage: uint64(tc.rowsPerPage)} - trafficTracker := paging.NewTrafficTracker[any](pagingCfg) readLimiterFactory := paging.NewReadLimiterFactory(nil) - sink, err := paging.NewSink( - ctx, - logger, - trafficTracker, - columnarBufferFactory, - readLimiterFactory.MakeReadLimiter(logger), - tc.bufferQueueCapacity, - ) - require.NoError(t, err) + readLimiter := readLimiterFactory.MakeReadLimiter(logger) + + sinkFactory := paging.NewSinkFactory(ctx, logger, pagingCfg, columnarBufferFactory, readLimiter) request := &api_service_protos.TReadSplitsRequest{} - streamer := NewStreamer(logger, stream, request, split, sink, dataSource) + streamer := NewStreamer(logger, stream, request, split, sinkFactory, dataSource) err = streamer.Run() diff --git a/common/connection.go b/common/connection.go index 41c77af2..2fb87b21 100644 --- a/common/connection.go +++ b/common/connection.go @@ -15,7 +15,9 @@ import ( ) func makeConnection(logger *zap.Logger, cfg *config.TClientConfig, additionalOpts ...grpc.DialOption) (*grpc.ClientConn, error) { - var opts []grpc.DialOption + opts := []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(32 * 1024 * 1024)), + } if cfg.Tls != nil { tlsCfg := &tls.Config{} diff --git a/common/endpoint.go b/common/endpoint.go index 7ba281d7..714dedaf 100644 --- a/common/endpoint.go +++ b/common/endpoint.go @@ -2,6 +2,8 @@ package common import ( "fmt" + "strconv" + "strings" api_common "github.com/ydb-platform/fq-connector-go/api/common" ) @@ -9,3 +11,25 @@ import ( func EndpointToString(ep *api_common.TGenericEndpoint) string { return fmt.Sprintf("%s:%d", ep.GetHost(), ep.GetPort()) } + +func StringToEndpoint(s string) (*api_common.TGenericEndpoint, error) { + ss := strings.Split(s, ":") + + if len(ss) != 2 { + return nil, fmt.Errorf("invalid endpoint format: %s", s) + } + + port, err := strconv.ParseUint(ss[1], 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid port: %s", ss[1]) + } + + if port > 65535 { + return nil, fmt.Errorf("invalid port: %s", ss[1]) + } + + return &api_common.TGenericEndpoint{ + Host: ss[0], + Port: uint32(port), + }, nil +}