Skip to content

Commit

Permalink
PostgreSQL: support UUID (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 authored Dec 28, 2024
1 parent dc16b2c commit 5442352
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 20 deletions.
17 changes: 15 additions & 2 deletions app/server/datasource/rdbms/postgresql/type_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/apache/arrow/go/v13/arrow/array"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

Expand Down Expand Up @@ -40,7 +41,7 @@ func (typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_ser
ydbType = common.MakePrimitiveType(Ydb.Type_FLOAT)
case "double precision", "float8":
ydbType = common.MakePrimitiveType(Ydb.Type_DOUBLE)
case "bytea":
case "bytea", "uuid":
ydbType = common.MakePrimitiveType(Ydb.Type_STRING)
case "character", "character varying", "text":
ydbType = common.MakePrimitiveType(Ydb.Type_UTF8)
Expand Down Expand Up @@ -73,7 +74,7 @@ func (typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_ser
}, nil
}

//nolint:gocyclo
//nolint:gocyclo,funlen
func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type, cc conversion.Collection) (paging.RowTransformer[any], error) {
acceptors := make([]any, 0, len(oids))
appenders := make([]func(acceptor any, builder array.Builder) error, 0, len(oids))
Expand Down Expand Up @@ -207,6 +208,18 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type, cc conversion.Coll
default:
return nil, fmt.Errorf("unexpected ydb type %v with type oid %d: %w", ydbTypes[i], oid, common.ErrDataTypeNotSupported)
}
case pgtype.UUIDOID:
acceptors = append(acceptors, new(*uuid.UUID))
appenders = append(appenders, func(acceptor any, builder array.Builder) error {
cast := acceptor.(**uuid.UUID)
if *cast != nil {
builder.(*array.BinaryBuilder).Append([]byte((**cast).String()))
} else {
builder.(*array.BinaryBuilder).AppendNull()
}

return nil
})
default:
return nil, fmt.Errorf("convert type OID %d: %w", oid, common.ErrDataTypeNotSupported)
}
Expand Down
2 changes: 1 addition & 1 deletion app/server/paging/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type sinkImpl[T Acceptor] struct {
resultQueue chan *ReadResult[T] // outgoing buffer queue
terminateChan chan<- struct{} // notify factory when the data reading is finished
bufferFactory ColumnarBufferFactory[T] // creates new buffer
trafficTracker *TrafficTracker[T] // tracks the amount of data passed through the sink
trafficTracker *trafficTracker[T] // tracks the amount of data passed through the sink
readLimiter ReadLimiter // helps to restrict the number of rows read in every request
logger *zap.Logger // annotated logger
state sinkState // flag showing if it's ready to return data
Expand Down
4 changes: 2 additions & 2 deletions app/server/paging/sink_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type sinkFactoryImpl[T Acceptor] struct {

// Every sink has own traffic tracker, but factory keeps all created trackers during its lifetime
// to provide overall traffic stats.
trafficTrackers []*TrafficTracker[T]
trafficTrackers []*trafficTracker[T]
}

// MakeSinks is used to generate Sink objects, one per each data source connection.
Expand All @@ -58,7 +58,7 @@ func (f *sinkFactoryImpl[T]) MakeSinks(totalSinks int) ([]Sink[T], error) {
}

// preserve traffic tracker to obtain stats in future
trafficTracker := NewTrafficTracker[T](f.cfg)
trafficTracker := newTrafficTracker[T](f.cfg)
f.trafficTrackers = append(f.trafficTrackers, trafficTracker)

sink := &sinkImpl[T]{
Expand Down
3 changes: 3 additions & 0 deletions app/server/paging/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"

"github.com/ydb-platform/fq-connector-go/common"
Expand Down Expand Up @@ -203,6 +204,8 @@ func sizeOfValueBloated(v any) (uint64, acceptorKind, error) {
return 16, fixedSize, nil
case *pgtype.Timestamp:
return 16, fixedSize, nil
case **uuid.UUID:
return 16, fixedSize, nil
default:
return 0, 0, fmt.Errorf("value %v of unexpected data type %T: %w", t, t, common.ErrDataTypeNotSupported)
}
Expand Down
16 changes: 8 additions & 8 deletions app/server/paging/traffic_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/ydb-platform/fq-connector-go/common"
)

type TrafficTracker[T Acceptor] struct {
type trafficTracker[T Acceptor] struct {
pagination *config.TPagingConfig
sizePattern *sizePattern[T]

Expand All @@ -26,7 +26,7 @@ type TrafficTracker[T Acceptor] struct {
// would exceed the limits on the page size.
// If there's enough space in buffer, it returns true and increases the internal counters.
// Otherwise it return false, but doesn't change internal state.
func (tt *TrafficTracker[T]) tryAddRow(acceptors []T) (bool, error) {
func (tt *trafficTracker[T]) tryAddRow(acceptors []T) (bool, error) {
if err := tt.maybeInit(acceptors); err != nil {
return false, fmt.Errorf("maybe init: %w", err)
}
Expand All @@ -51,7 +51,7 @@ func (tt *TrafficTracker[T]) tryAddRow(acceptors []T) (bool, error) {
return true, nil
}

func (tt *TrafficTracker[T]) maybeInit(acceptors []T) error {
func (tt *trafficTracker[T]) maybeInit(acceptors []T) error {
if tt.sizePattern == nil {
// lazy initialization when the first row is ready
var err error
Expand All @@ -65,7 +65,7 @@ func (tt *TrafficTracker[T]) maybeInit(acceptors []T) error {
return nil
}

func (tt *TrafficTracker[T]) checkPageSizeLimit(bytesDelta, rowsDelta uint64) (bool, error) {
func (tt *trafficTracker[T]) checkPageSizeLimit(bytesDelta, rowsDelta uint64) (bool, error) {
if tt.pagination.BytesPerPage != 0 {
// almost impossible case, but have to check
if bytesDelta > tt.pagination.BytesPerPage {
Expand All @@ -92,12 +92,12 @@ func (tt *TrafficTracker[T]) checkPageSizeLimit(bytesDelta, rowsDelta uint64) (b
return false, nil
}

func (tt *TrafficTracker[T]) refreshCounters() {
func (tt *trafficTracker[T]) refreshCounters() {
tt.bytesCurr = tt.bytesTotal.MakeChild()
tt.rowsCurr = tt.rowsTotal.MakeChild()
}

func (tt *TrafficTracker[T]) DumpStats(total bool) *api_service_protos.TReadSplitsResponse_TStats {
func (tt *trafficTracker[T]) DumpStats(total bool) *api_service_protos.TReadSplitsResponse_TStats {
rowsCounter := tt.rowsCurr
bytesCounter := tt.bytesCurr

Expand All @@ -114,8 +114,8 @@ func (tt *TrafficTracker[T]) DumpStats(total bool) *api_service_protos.TReadSpli
return result
}

func NewTrafficTracker[T Acceptor](pagination *config.TPagingConfig) *TrafficTracker[T] {
tt := &TrafficTracker[T]{
func newTrafficTracker[T Acceptor](pagination *config.TPagingConfig) *trafficTracker[T] {
tt := &trafficTracker[T]{
pagination: pagination,
bytesTotal: utils.NewCounter[uint64](),
rowsTotal: utils.NewCounter[uint64](),
Expand Down
6 changes: 3 additions & 3 deletions app/server/paging/traffic_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestTrafficTracker(t *testing.T) {
RowsPerPage: 2,
}

tt := NewTrafficTracker[any](cfg)
tt := newTrafficTracker[any](cfg)

col1Acceptor := new(int32)
col2Acceptor := new(string)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestTrafficTracker(t *testing.T) {
BytesPerPage: 40,
}

tt := NewTrafficTracker[any](cfg)
tt := newTrafficTracker[any](cfg)

col1Acceptor := new(uint64)
col2Acceptor := new([]byte)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestTrafficTracker(t *testing.T) {
BytesPerPage: 1,
}

tt := NewTrafficTracker[any](cfg)
tt := newTrafficTracker[any](cfg)
col1Acceptor := new(int32)
acceptors := []any{col1Acceptor}

Expand Down
12 changes: 8 additions & 4 deletions tests/infra/datasource/postgresql/init/init_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,29 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-E
col_22_text text,
col_23_timestamp timestamp,
col_24_date date,
col_25_json json
col_25_json json,
col_26_uuid UUID
);
INSERT INTO primitives VALUES (
1, false, 2, 3, DEFAULT, DEFAULT, 6, 7, 8, DEFAULT, DEFAULT, 11, 12, DEFAULT, DEFAULT,
15.15, 16.16, 17.17, 18.18, 'az', 'az', 'az', 'az',
'1988-11-20 12:55:28.123000', '1988-11-20',
'{ "friends": [{"name": "James Holden","age": 35},{"name": "Naomi Nagata","age": 30}]}'::json
'{ "friends": [{"name": "James Holden","age": 35},{"name": "Naomi Nagata","age": 30}]}'::json,
'dce06500-b56b-412b-bc39-f9fafb602663'
);
INSERT INTO primitives VALUES (
2, true, -2, -3, DEFAULT, DEFAULT, -6, -7, -8, DEFAULT, DEFAULT, -11, -12, DEFAULT, DEFAULT,
-15.15, -16.16, -17.17, -18.18, 'буки', 'буки', 'буки', 'буки',
'2023-03-21 11:21:31.456000', '2023-03-21',
'{ "TODO" : "unicode" }'::json
'{ "TODO" : "unicode" }'::json,
'b18cafa2-9892-4515-843d-e8ee9bd9a858'
);
INSERT INTO primitives VALUES (
3, NULL, NULL, NULL, DEFAULT, DEFAULT, NULL,
NULL, NULL, DEFAULT, DEFAULT, NULL, NULL,
DEFAULT, DEFAULT, NULL, NULL, NULL, NULL,
NULL, NULL, NULL, NULL, NULL, NULL, NULL
NULL, NULL, NULL, NULL, NULL, NULL, NULL,
NULL
);
EOSQL

Expand Down
6 changes: 6 additions & 0 deletions tests/infra/datasource/postgresql/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var tables = map[string]*test_utils.Table[int32, *array.Int32Builder]{
"col_23_timestamp": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_TIMESTAMP)),
"col_24_date": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_DATE)),
"col_25_json": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_JSON)),
"col_26_uuid": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_STRING)),
},
},
Records: []*test_utils.Record[int32, *array.Int32Builder]{
Expand Down Expand Up @@ -205,6 +206,11 @@ var tables = map[string]*test_utils.Table[int32, *array.Int32Builder]{
ptr.String("{ \"TODO\" : \"unicode\" }"),
nil,
},
"col_26_uuid": []*[]byte{
ptr.T([]byte(string("dce06500-b56b-412b-bc39-f9fafb602663"))),
ptr.T([]byte(string("b18cafa2-9892-4515-843d-e8ee9bd9a858"))),
nil,
},
},
},
},
Expand Down

0 comments on commit 5442352

Please sign in to comment.