From a0b65fd618483e38ed351a4d691639ce65c8fa6c Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Wed, 20 Nov 2024 17:24:01 +0800 Subject: [PATCH] feat(pgwire): support struct type in extended mode (#19450) --- ci/scripts/run-e2e-test.sh | 4 ++++ e2e_test/python_client/main.py | 19 ++++++++++++++++++ src/common/src/types/postgres_type.rs | 5 ++--- src/common/src/types/to_binary.rs | 29 +++++++++++++++++++++++++-- 4 files changed, 52 insertions(+), 5 deletions(-) create mode 100644 e2e_test/python_client/main.py diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index a8601fbb0ebe..e84ead4a81df 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -109,6 +109,10 @@ sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt' sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt' sqllogictest -p 4566 -d test './e2e_test/database/test.slt' +echo "--- e2e, $mode, python_client" +python3 -m pip install --break-system-packages psycopg +python3 ./e2e_test/python_client/main.py + echo "--- e2e, $mode, subscription" python3 -m pip install --break-system-packages psycopg2-binary sqllogictest -p 4566 -d dev './e2e_test/subscription/check_sql_statement.slt' diff --git a/e2e_test/python_client/main.py b/e2e_test/python_client/main.py new file mode 100644 index 000000000000..bb41ba6c38f3 --- /dev/null +++ b/e2e_test/python_client/main.py @@ -0,0 +1,19 @@ +import psycopg + +def test_psycopg_extended_mode(): + conn = psycopg.connect(host='localhost', port='4566', dbname='dev', user='root') + with conn.cursor() as cur: + cur.execute("select Array[1::bigint, 2::bigint, 3::bigint]", binary=True) + assert cur.fetchone() == ([1, 2, 3],) + + cur.execute("select Array['foo', null, 'bar']", binary=True) + assert cur.fetchone() == (['foo', None, 'bar'],) + + cur.execute("select ROW('123 Main St', 'New York', '10001')", binary=True) + assert cur.fetchone() == (('123 Main St', 'New York', '10001'),) + + cur.execute("select array[ROW('123 Main St', 'New York', '10001'), ROW('234 Main St', null, '10001')]", binary=True) + assert cur.fetchone() == ([('123 Main St', 'New York', '10001'), ('234 Main St', None, '10001')],) + +if __name__ == '__main__': + test_psycopg_extended_mode() diff --git a/src/common/src/types/postgres_type.rs b/src/common/src/types/postgres_type.rs index d85f08ed59cc..c84f3e19f309 100644 --- a/src/common/src/types/postgres_type.rs +++ b/src/common/src/types/postgres_type.rs @@ -116,7 +116,7 @@ impl DataType { )* DataType::Int256 => 1302, DataType::Serial => 1016, - DataType::Struct(_) => -1, + DataType::Struct(_) => 2287, // pseudo-type of array[struct] (see `pg_type.dat`) DataType::List { .. } => unreachable!("Never reach here!"), DataType::Map(_) => 1304, } @@ -125,8 +125,7 @@ impl DataType { DataType::Int256 => 1301, DataType::Map(_) => 1303, // TODO: Support to give a new oid for custom struct type. #9434 - // 1043 is varchar - DataType::Struct(_) => 1043, + DataType::Struct(_) => 2249, // pseudo-type of struct (see `pg_type.dat`) } } } diff --git a/src/common/src/types/to_binary.rs b/src/common/src/types/to_binary.rs index 7c5e88dbc10c..294f96bc7045 100644 --- a/src/common/src/types/to_binary.rs +++ b/src/common/src/types/to_binary.rs @@ -14,12 +14,13 @@ use bytes::{BufMut, Bytes, BytesMut}; use postgres_types::{ToSql, Type}; +use rw_iter_util::ZipEqFast; use super::{ DataType, Date, Decimal, Interval, ScalarRefImpl, Serial, Time, Timestamp, Timestamptz, F32, F64, }; -use crate::array::ListRef; +use crate::array::{ListRef, StructRef}; use crate::error::NotImplemented; /// Error type for [`ToBinary`] trait. @@ -116,6 +117,29 @@ impl ToBinary for ListRef<'_> { } } +impl ToBinary for StructRef<'_> { + fn to_binary_with_type(&self, ty: &DataType) -> Result { + // Reference: Postgres code `src/backend/utils/adt/rowtypes.c` + // https://github.com/postgres/postgres/blob/a3699daea2026de324ed7cc7115c36d3499010d3/src/backend/utils/adt/rowtypes.c#L687 + let mut buf = BytesMut::new(); + buf.put_i32(ty.as_struct().len() as i32); // number of columns + for (datum, field_ty) in self.iter_fields_ref().zip_eq_fast(ty.as_struct().types()) { + buf.put_i32(field_ty.to_oid()); // column type + match datum { + None => { + buf.put_i32(-1); // -1 length means a NULL + } + Some(value) => { + let data = value.to_binary_with_type(field_ty)?; + buf.put_i32(data.len() as i32); // Length of element + buf.put(data); + } + } + } + Ok(buf.into()) + } +} + impl ToBinary for ScalarRefImpl<'_> { fn to_binary_with_type(&self, ty: &DataType) -> Result { match self { @@ -137,7 +161,8 @@ impl ToBinary for ScalarRefImpl<'_> { ScalarRefImpl::Bytea(v) => v.to_binary_with_type(ty), ScalarRefImpl::Jsonb(v) => v.to_binary_with_type(ty), ScalarRefImpl::List(v) => v.to_binary_with_type(ty), - ScalarRefImpl::Struct(_) | ScalarRefImpl::Map(_) => { + ScalarRefImpl::Struct(v) => v.to_binary_with_type(ty), + ScalarRefImpl::Map(_) => { bail_not_implemented!( issue = 7949, "the pgwire extended-mode encoding for {ty} is unsupported"