Skip to content

Commit

Permalink
feat(pgwire): support struct type in extended mode (#19450)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Nov 20, 2024
1 parent c1e8f9a commit a0b65fd
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 5 deletions.
4 changes: 4 additions & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt'
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'

echo "--- e2e, $mode, python_client"
python3 -m pip install --break-system-packages psycopg
python3 ./e2e_test/python_client/main.py

echo "--- e2e, $mode, subscription"
python3 -m pip install --break-system-packages psycopg2-binary
sqllogictest -p 4566 -d dev './e2e_test/subscription/check_sql_statement.slt'
Expand Down
19 changes: 19 additions & 0 deletions e2e_test/python_client/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import psycopg

def test_psycopg_extended_mode():
conn = psycopg.connect(host='localhost', port='4566', dbname='dev', user='root')
with conn.cursor() as cur:
cur.execute("select Array[1::bigint, 2::bigint, 3::bigint]", binary=True)
assert cur.fetchone() == ([1, 2, 3],)

cur.execute("select Array['foo', null, 'bar']", binary=True)
assert cur.fetchone() == (['foo', None, 'bar'],)

cur.execute("select ROW('123 Main St', 'New York', '10001')", binary=True)
assert cur.fetchone() == (('123 Main St', 'New York', '10001'),)

cur.execute("select array[ROW('123 Main St', 'New York', '10001'), ROW('234 Main St', null, '10001')]", binary=True)
assert cur.fetchone() == ([('123 Main St', 'New York', '10001'), ('234 Main St', None, '10001')],)

if __name__ == '__main__':
test_psycopg_extended_mode()
5 changes: 2 additions & 3 deletions src/common/src/types/postgres_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl DataType {
)*
DataType::Int256 => 1302,
DataType::Serial => 1016,
DataType::Struct(_) => -1,
DataType::Struct(_) => 2287, // pseudo-type of array[struct] (see `pg_type.dat`)
DataType::List { .. } => unreachable!("Never reach here!"),
DataType::Map(_) => 1304,
}
Expand All @@ -125,8 +125,7 @@ impl DataType {
DataType::Int256 => 1301,
DataType::Map(_) => 1303,
// TODO: Support to give a new oid for custom struct type. #9434
// 1043 is varchar
DataType::Struct(_) => 1043,
DataType::Struct(_) => 2249, // pseudo-type of struct (see `pg_type.dat`)
}
}
}
Expand Down
29 changes: 27 additions & 2 deletions src/common/src/types/to_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

use bytes::{BufMut, Bytes, BytesMut};
use postgres_types::{ToSql, Type};
use rw_iter_util::ZipEqFast;

use super::{
DataType, Date, Decimal, Interval, ScalarRefImpl, Serial, Time, Timestamp, Timestamptz, F32,
F64,
};
use crate::array::ListRef;
use crate::array::{ListRef, StructRef};
use crate::error::NotImplemented;

/// Error type for [`ToBinary`] trait.
Expand Down Expand Up @@ -116,6 +117,29 @@ impl ToBinary for ListRef<'_> {
}
}

impl ToBinary for StructRef<'_> {
fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
// Reference: Postgres code `src/backend/utils/adt/rowtypes.c`
// https://github.com/postgres/postgres/blob/a3699daea2026de324ed7cc7115c36d3499010d3/src/backend/utils/adt/rowtypes.c#L687
let mut buf = BytesMut::new();
buf.put_i32(ty.as_struct().len() as i32); // number of columns
for (datum, field_ty) in self.iter_fields_ref().zip_eq_fast(ty.as_struct().types()) {
buf.put_i32(field_ty.to_oid()); // column type
match datum {
None => {
buf.put_i32(-1); // -1 length means a NULL
}
Some(value) => {
let data = value.to_binary_with_type(field_ty)?;
buf.put_i32(data.len() as i32); // Length of element
buf.put(data);
}
}
}
Ok(buf.into())
}
}

impl ToBinary for ScalarRefImpl<'_> {
fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
match self {
Expand All @@ -137,7 +161,8 @@ impl ToBinary for ScalarRefImpl<'_> {
ScalarRefImpl::Bytea(v) => v.to_binary_with_type(ty),
ScalarRefImpl::Jsonb(v) => v.to_binary_with_type(ty),
ScalarRefImpl::List(v) => v.to_binary_with_type(ty),
ScalarRefImpl::Struct(_) | ScalarRefImpl::Map(_) => {
ScalarRefImpl::Struct(v) => v.to_binary_with_type(ty),
ScalarRefImpl::Map(_) => {
bail_not_implemented!(
issue = 7949,
"the pgwire extended-mode encoding for {ty} is unsupported"
Expand Down

0 comments on commit a0b65fd

Please sign in to comment.