-
Hej! First thank you for the great work on PGx! Very nice library and it is easy to switch from sqlx to pgx. In our project we are using It might be that we are doing something wrong here, but it seems that when it comes to the I have created a testcase for it, it seems that because the The test case is best run on an empty DB. It will create the DB schema and insert mock data to run the selects against. Is there something we are doing wrong as the Thanks in advance! package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// Our base post table
var postsTable = `
CREATE TABLE IF NOT EXISTS posts (
id SERIAL PRIMARY KEY,
title TEXT,
body TEXT,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
);
`
// Our base tags table
var tagsTable = `
CREATE TABLE IF NOT EXISTS tags (
id SERIAL PRIMARY KEY,
name TEXT,
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
);
`
// Our base pivot table
var pivotTable = `
CREATE TABLE IF NOT EXISTS posts_tags (
post_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (post_id, tag_id)
);
`
// Append extra column to tags table
var alterTags = `
ALTER TABLE tags ADD COLUMN IF NOT EXISTS highlighted BOOL;
`
type Post struct {
ID int
Title string
Body string
Tags []Tag
CreatedAt *time.Time
UpdatedAt *time.Time
}
type Tag struct {
ID int
Name string
Highlighted bool // Added later to our struct but not in the same order as the table
CreatedAt *time.Time
UpdatedAt *time.Time
}
func main() {
ctx := context.Background()
// Connect to DB
host := os.Getenv("PG_HOST")
port := os.Getenv("PG_PORT")
user := os.Getenv("PG_USER")
password := os.Getenv("PG_PASSWORD")
dbName := os.Getenv("PG_DATABASE")
connStr := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s", host, port, user, dbName, password)
config, err := pgxpool.ParseConfig(connStr)
if err != nil {
log.Fatal(err)
}
db, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
log.Fatal(err)
}
// We build up our table structure and append a column to the tags table in a later version
// of our code base
dbStruct := []string{
postsTable,
tagsTable,
pivotTable,
alterTags,
}
for _, mig := range dbStruct {
_, err = db.Exec(ctx, mig)
if err != nil {
log.Fatal(err)
}
}
mockData := []string{
`INSERT INTO posts (title, body, created_at, updated_at) VALUES ('Test post 1', 'Very very nice post', NOW(), NOW()), ('Test post 1', 'Very very nice post', NOW(), NOW());`,
`INSERT INTO tags (name, highlighted, created_at, updated_at) VALUES ('tag1', TRUE, NOW(), NOW()), ('tag2', TRUE, NOW(), NOW()), ('tag3', TRUE, NOW(), NOW()), ('tag4', TRUE, NOW(), NOW());`,
`INSERT INTO posts_tags (post_id, tag_id) VALUES (1, 1), (1, 2), (2, 3), (2, 4);`,
}
for _, mock := range mockData {
_, err = db.Exec(ctx, mock)
if err != nil {
log.Fatal(err)
}
}
// We query our result with specifying the order of columns to match the Go struct,
// so basically it is using the pgx.RowToStructByPos for the tags.
fmt.Println(">>> Working:")
stmt := `
SELECT
posts.*,
(
SELECT array_agg(
row(
tags.id,
tags.name,
tags.highlighted,
tags.created_at,
tags.updated_at
)
)
FROM tags
INNER JOIN posts_tags ON posts_tags.tag_id = tags.id AND posts_tags.post_id = posts.id
) AS tags
FROM posts;
`
rows, err := db.Query(ctx, stmt)
if err != nil {
log.Fatal(err)
}
posts, err := pgx.CollectRows(rows, pgx.RowToStructByName[Post])
if err != nil {
log.Fatal(err)
}
fmt.Printf("%#v\n", posts)
fmt.Println(">>> Failing:")
// This testcase fails as it is trying to map the created_at column into the highlighted column
// as the struct fields do not match the order of the columns.
stmt = `
SELECT
posts.*,
(
SELECT array_agg(
row(
tags.*
)
)
FROM tags
INNER JOIN posts_tags ON posts_tags.tag_id = tags.id AND posts_tags.post_id = posts.id
) AS tags
FROM posts;
`
rows, err = db.Query(ctx, stmt)
if err != nil {
log.Fatal(err)
}
posts, err = pgx.CollectRows(rows, pgx.RowToStructByName[Post])
if err != nil {
log.Fatal(err)
}
fmt.Printf("%#v\n", posts)
} Output:
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
It potentially is possible, but is a little tricky. At the top level of a query, In theory, it should be possible to map by name if the composite types have been registered so the type definition is available in the |
Beta Was this translation helpful? Give feedback.
-
So after some tinkering I have build a PoC, this is by no means perfect. But it works without any changes to PGx itself. We have decided to go a different route and we will defined the columns in the correct order. But I'll leave the PoC here in case anybody else is looking to retrieve the same result. There was one small issue that I ran into and that is that the I started by adding the types to the query: SELECT
posts.*,
(
SELECT array_agg(
row(
tags.*
)::tags
)::tags[]
FROM tags
INNER JOIN posts_tags ON posts_tags.tag_id = tags.id AND posts_tags.post_id = posts.id
) AS tags
FROM posts; Then in the config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
tagDataType, err := conn.LoadType(ctx, "tags")
if err != nil {
return err
}
cCodec, ok := tagDataType.Codec.(*pgtype.CompositeCodec)
if ok {
dt := &CustomCodec{}
dt.Fields = cCodec.Fields
// Define the column names -> struct field index for the type
// This could be made generic by using reflection instead of hard coding column names and indexes, but this is a PoC
dt.StrctMap = map[string]int{
"id": 0,
"name": 1,
"highlighted": 2,
"created_at": 3,
"updated_at": 4,
}
tagDataType.Codec = dt
}
conn.TypeMap().RegisterType(tagDataType)
dataType, err := conn.LoadType(ctx, "_tags")
if err != nil {
return err
}
dataType.Codec = &pgtype.ArrayCodec{
ElementType: tagDataType,
}
conn.TypeMap().RegisterType(dataType)
return nil
} In the code you can see that the type CustomCodec struct {
pgtype.CompositeCodec
StrctMap map[string]int
}
func (c *CustomCodec) PreferredFormat() int16 {
return pgtype.BinaryFormatCode
}
func (c *CustomCodec) PlanScan(m *pgtype.Map, oid uint32, format int16, target any) pgtype.ScanPlan {
switch format {
case pgtype.BinaryFormatCode:
switch target.(type) {
case pgtype.CompositeIndexScanner:
return &scanPlanBinaryCompositeToCompositeIndexScanner{cc: c, m: m}
}
}
return nil
}
type scanPlanBinaryCompositeToCompositeIndexScanner struct {
cc *CustomCodec
m *pgtype.Map
}
func (plan *scanPlanBinaryCompositeToCompositeIndexScanner) Scan(src []byte, target any) error {
targetScanner := (target).(pgtype.CompositeIndexScanner)
if src == nil {
return targetScanner.ScanNull()
}
scanner := pgtype.NewCompositeBinaryScanner(plan.m, src)
for _, field := range plan.cc.Fields {
if scanner.Next() {
fieldTarget := targetScanner.ScanIndex(plan.cc.StrctMap[field.Name])
if fieldTarget != nil {
fieldPlan := plan.m.PlanScan(field.Type.OID, pgtype.BinaryFormatCode, fieldTarget)
if fieldPlan == nil {
return fmt.Errorf("unable to encode %v into OID %d in binary format", field, field.Type.OID)
}
err := fieldPlan.Scan(scanner.Bytes(), fieldTarget)
if err != nil {
return err
}
}
} else {
return errors.New("read past end of composite")
}
}
if err := scanner.Err(); err != nil {
return err
}
return nil
} If the One idea is to create a PR (if there is interest for it and it benefits PGx) to create a We have decided to not continue this route but instead just specify the columns, as it is easier to understand and there is less code to maintain. This is a PoC so I would not recommend to use this in production, we have not written tests for this and have not tested it with for example nested composite types. Use at your own risk! 😄 Thanks for the help! |
Beta Was this translation helpful? Give feedback.
So after some tinkering I have build a PoC, this is by no means perfect. But it works without any changes to PGx itself. We have decided to go a different route and we will defined the columns in the correct order. But I'll leave the PoC here in case anybody else is looking to retrieve the same result.
There was one small issue that I ran into and that is that the
pgtype.ptrStructWrapper
has theexportedFields
field, sadly that field is not exported. If it was exported then I could use the index from the reflect.Value based on the column and field name. For the PoC I generated my own mapping (column name -> field index in struct, as amap[string]int
). Other than that I have used the default