diff --git a/app/server/datasource/rdbms/postgresql/type_mapper.go b/app/server/datasource/rdbms/postgresql/type_mapper.go index 58f111f6..e6b7123c 100644 --- a/app/server/datasource/rdbms/postgresql/type_mapper.go +++ b/app/server/datasource/rdbms/postgresql/type_mapper.go @@ -6,6 +6,7 @@ import ( "time" "github.com/apache/arrow/go/v13/arrow/array" + "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" @@ -40,7 +41,7 @@ func (typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_ser ydbType = common.MakePrimitiveType(Ydb.Type_FLOAT) case "double precision", "float8": ydbType = common.MakePrimitiveType(Ydb.Type_DOUBLE) - case "bytea": + case "bytea", "uuid": ydbType = common.MakePrimitiveType(Ydb.Type_STRING) case "character", "character varying", "text": ydbType = common.MakePrimitiveType(Ydb.Type_UTF8) @@ -73,7 +74,7 @@ func (typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_ser }, nil } -//nolint:gocyclo +//nolint:gocyclo,funlen func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) { acceptors := make([]any, 0, len(oids)) appenders := make([]func(acceptor any, builder array.Builder) error, 0, len(oids)) @@ -207,6 +208,18 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type, cc conversion.Coll default: return nil, fmt.Errorf("unexpected ydb type %v with type oid %d: %w", ydbTypes[i], oid, common.ErrDataTypeNotSupported) } + case pgtype.UUIDOID: + acceptors = append(acceptors, new(*uuid.UUID)) + appenders = append(appenders, func(acceptor any, builder array.Builder) error { + cast := acceptor.(**uuid.UUID) + if *cast != nil { + builder.(*array.BinaryBuilder).Append([]byte((**cast).String())) + } else { + builder.(*array.BinaryBuilder).AppendNull() + } + + return nil + }) default: return nil, fmt.Errorf("convert type OID %d: %w", oid, common.ErrDataTypeNotSupported) } diff --git a/app/server/paging/sink.go b/app/server/paging/sink.go index f8a34237..5c378452 100644 --- a/app/server/paging/sink.go +++ b/app/server/paging/sink.go @@ -27,7 +27,7 @@ type sinkImpl[T Acceptor] struct { resultQueue chan *ReadResult[T] // outgoing buffer queue terminateChan chan<- struct{} // notify factory when the data reading is finished bufferFactory ColumnarBufferFactory[T] // creates new buffer - trafficTracker *TrafficTracker[T] // tracks the amount of data passed through the sink + trafficTracker *trafficTracker[T] // tracks the amount of data passed through the sink readLimiter ReadLimiter // helps to restrict the number of rows read in every request logger *zap.Logger // annotated logger state sinkState // flag showing if it's ready to return data diff --git a/app/server/paging/sink_factory.go b/app/server/paging/sink_factory.go index 7cf5ff11..2f6d31cc 100644 --- a/app/server/paging/sink_factory.go +++ b/app/server/paging/sink_factory.go @@ -33,7 +33,7 @@ type sinkFactoryImpl[T Acceptor] struct { // Every sink has own traffic tracker, but factory keeps all created trackers during its lifetime // to provide overall traffic stats. - trafficTrackers []*TrafficTracker[T] + trafficTrackers []*trafficTracker[T] } // MakeSinks is used to generate Sink objects, one per each data source connection. @@ -58,7 +58,7 @@ func (f *sinkFactoryImpl[T]) MakeSinks(totalSinks int) ([]Sink[T], error) { } // preserve traffic tracker to obtain stats in future - trafficTracker := NewTrafficTracker[T](f.cfg) + trafficTracker := newTrafficTracker[T](f.cfg) f.trafficTrackers = append(f.trafficTrackers, trafficTracker) sink := &sinkImpl[T]{ diff --git a/app/server/paging/size.go b/app/server/paging/size.go index 108338ce..7f6d3ddc 100644 --- a/app/server/paging/size.go +++ b/app/server/paging/size.go @@ -5,6 +5,7 @@ import ( "reflect" "time" + "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" "github.com/ydb-platform/fq-connector-go/common" @@ -203,6 +204,8 @@ func sizeOfValueBloated(v any) (uint64, acceptorKind, error) { return 16, fixedSize, nil case *pgtype.Timestamp: return 16, fixedSize, nil + case **uuid.UUID: + return 16, fixedSize, nil default: return 0, 0, fmt.Errorf("value %v of unexpected data type %T: %w", t, t, common.ErrDataTypeNotSupported) } diff --git a/app/server/paging/traffic_tracker.go b/app/server/paging/traffic_tracker.go index 1c1d442d..12320d77 100644 --- a/app/server/paging/traffic_tracker.go +++ b/app/server/paging/traffic_tracker.go @@ -9,7 +9,7 @@ import ( "github.com/ydb-platform/fq-connector-go/common" ) -type TrafficTracker[T Acceptor] struct { +type trafficTracker[T Acceptor] struct { pagination *config.TPagingConfig sizePattern *sizePattern[T] @@ -26,7 +26,7 @@ type TrafficTracker[T Acceptor] struct { // would exceed the limits on the page size. // If there's enough space in buffer, it returns true and increases the internal counters. // Otherwise it return false, but doesn't change internal state. -func (tt *TrafficTracker[T]) tryAddRow(acceptors []T) (bool, error) { +func (tt *trafficTracker[T]) tryAddRow(acceptors []T) (bool, error) { if err := tt.maybeInit(acceptors); err != nil { return false, fmt.Errorf("maybe init: %w", err) } @@ -51,7 +51,7 @@ func (tt *TrafficTracker[T]) tryAddRow(acceptors []T) (bool, error) { return true, nil } -func (tt *TrafficTracker[T]) maybeInit(acceptors []T) error { +func (tt *trafficTracker[T]) maybeInit(acceptors []T) error { if tt.sizePattern == nil { // lazy initialization when the first row is ready var err error @@ -65,7 +65,7 @@ func (tt *TrafficTracker[T]) maybeInit(acceptors []T) error { return nil } -func (tt *TrafficTracker[T]) checkPageSizeLimit(bytesDelta, rowsDelta uint64) (bool, error) { +func (tt *trafficTracker[T]) checkPageSizeLimit(bytesDelta, rowsDelta uint64) (bool, error) { if tt.pagination.BytesPerPage != 0 { // almost impossible case, but have to check if bytesDelta > tt.pagination.BytesPerPage { @@ -92,12 +92,12 @@ func (tt *TrafficTracker[T]) checkPageSizeLimit(bytesDelta, rowsDelta uint64) (b return false, nil } -func (tt *TrafficTracker[T]) refreshCounters() { +func (tt *trafficTracker[T]) refreshCounters() { tt.bytesCurr = tt.bytesTotal.MakeChild() tt.rowsCurr = tt.rowsTotal.MakeChild() } -func (tt *TrafficTracker[T]) DumpStats(total bool) *api_service_protos.TReadSplitsResponse_TStats { +func (tt *trafficTracker[T]) DumpStats(total bool) *api_service_protos.TReadSplitsResponse_TStats { rowsCounter := tt.rowsCurr bytesCounter := tt.bytesCurr @@ -114,8 +114,8 @@ func (tt *TrafficTracker[T]) DumpStats(total bool) *api_service_protos.TReadSpli return result } -func NewTrafficTracker[T Acceptor](pagination *config.TPagingConfig) *TrafficTracker[T] { - tt := &TrafficTracker[T]{ +func newTrafficTracker[T Acceptor](pagination *config.TPagingConfig) *trafficTracker[T] { + tt := &trafficTracker[T]{ pagination: pagination, bytesTotal: utils.NewCounter[uint64](), rowsTotal: utils.NewCounter[uint64](), diff --git a/app/server/paging/traffic_tracker_test.go b/app/server/paging/traffic_tracker_test.go index 58a81698..d0b6be83 100644 --- a/app/server/paging/traffic_tracker_test.go +++ b/app/server/paging/traffic_tracker_test.go @@ -18,7 +18,7 @@ func TestTrafficTracker(t *testing.T) { RowsPerPage: 2, } - tt := NewTrafficTracker[any](cfg) + tt := newTrafficTracker[any](cfg) col1Acceptor := new(int32) col2Acceptor := new(string) @@ -88,7 +88,7 @@ func TestTrafficTracker(t *testing.T) { BytesPerPage: 40, } - tt := NewTrafficTracker[any](cfg) + tt := newTrafficTracker[any](cfg) col1Acceptor := new(uint64) col2Acceptor := new([]byte) @@ -128,7 +128,7 @@ func TestTrafficTracker(t *testing.T) { BytesPerPage: 1, } - tt := NewTrafficTracker[any](cfg) + tt := newTrafficTracker[any](cfg) col1Acceptor := new(int32) acceptors := []any{col1Acceptor} diff --git a/tests/infra/datasource/postgresql/init/init_db.sh b/tests/infra/datasource/postgresql/init/init_db.sh index cd77de75..852456e7 100755 --- a/tests/infra/datasource/postgresql/init/init_db.sh +++ b/tests/infra/datasource/postgresql/init/init_db.sh @@ -39,25 +39,29 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-E col_22_text text, col_23_timestamp timestamp, col_24_date date, - col_25_json json + col_25_json json, + col_26_uuid UUID ); INSERT INTO primitives VALUES ( 1, false, 2, 3, DEFAULT, DEFAULT, 6, 7, 8, DEFAULT, DEFAULT, 11, 12, DEFAULT, DEFAULT, 15.15, 16.16, 17.17, 18.18, 'az', 'az', 'az', 'az', '1988-11-20 12:55:28.123000', '1988-11-20', - '{ "friends": [{"name": "James Holden","age": 35},{"name": "Naomi Nagata","age": 30}]}'::json + '{ "friends": [{"name": "James Holden","age": 35},{"name": "Naomi Nagata","age": 30}]}'::json, + 'dce06500-b56b-412b-bc39-f9fafb602663' ); INSERT INTO primitives VALUES ( 2, true, -2, -3, DEFAULT, DEFAULT, -6, -7, -8, DEFAULT, DEFAULT, -11, -12, DEFAULT, DEFAULT, -15.15, -16.16, -17.17, -18.18, 'буки', 'буки', 'буки', 'буки', '2023-03-21 11:21:31.456000', '2023-03-21', - '{ "TODO" : "unicode" }'::json + '{ "TODO" : "unicode" }'::json, + 'b18cafa2-9892-4515-843d-e8ee9bd9a858' ); INSERT INTO primitives VALUES ( 3, NULL, NULL, NULL, DEFAULT, DEFAULT, NULL, NULL, NULL, DEFAULT, DEFAULT, NULL, NULL, DEFAULT, DEFAULT, NULL, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL, NULL + NULL, NULL, NULL, NULL, NULL, NULL, NULL, + NULL ); EOSQL diff --git a/tests/infra/datasource/postgresql/tables.go b/tests/infra/datasource/postgresql/tables.go index 5f7a39d1..f7946051 100644 --- a/tests/infra/datasource/postgresql/tables.go +++ b/tests/infra/datasource/postgresql/tables.go @@ -72,6 +72,7 @@ var tables = map[string]*test_utils.Table[int32, *array.Int32Builder]{ "col_23_timestamp": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_TIMESTAMP)), "col_24_date": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_DATE)), "col_25_json": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_JSON)), + "col_26_uuid": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_STRING)), }, }, Records: []*test_utils.Record[int32, *array.Int32Builder]{ @@ -205,6 +206,11 @@ var tables = map[string]*test_utils.Table[int32, *array.Int32Builder]{ ptr.String("{ \"TODO\" : \"unicode\" }"), nil, }, + "col_26_uuid": []*[]byte{ + ptr.T([]byte(string("dce06500-b56b-412b-bc39-f9fafb602663"))), + ptr.T([]byte(string("b18cafa2-9892-4515-843d-e8ee9bd9a858"))), + nil, + }, }, }, },