Skip to content

Commit

Permalink
Introduce RowBuilder to support writing basic unit tests
Browse files Browse the repository at this point in the history
Added RowDescription trait, and let rows to share the same description
rather than having a copy in each row (think when there are thousand of
them in the result).

Added RowBuilder to support adding stubs of row data in unit tests.

Currently, the library users have no chooice but have to use integration
tests for testing Postgres data access code. With the changes in this
commit, the `tokio-postgres` lib users can use RowBuilder to create
sutbs to verify the deserialization from database result (Rows) to
custom stucts in unit tests.

It can also serves as a base for future implementation of certain kind
of mocks of the db connection.

Related-to #910 #950
  • Loading branch information
Hunts Chen committed Jan 5, 2023
1 parent 0c05614 commit f712d07
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 17 deletions.
7 changes: 6 additions & 1 deletion postgres-protocol/src/message/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl Message {
DATA_ROW_TAG => {
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::DataRow(DataRowBody { storage, len })
Message::DataRow(DataRowBody::new(storage, len))
}
ERROR_RESPONSE_TAG => {
let storage = buf.read_all();
Expand Down Expand Up @@ -531,6 +531,11 @@ pub struct DataRowBody {
}

impl DataRowBody {
/// Constructs a new data row body.
pub fn new(storage: Bytes, len: u16) -> Self {
Self { storage, len }
}

#[inline]
pub fn ranges(&self) -> DataRowRanges<'_> {
DataRowRanges {
Expand Down
1 change: 1 addition & 0 deletions postgres-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ impl<'a> FromSql<'a> for IpAddr {
}

/// An enum representing the nullability of a Postgres value.
#[derive(Debug, Eq, PartialEq)]
pub enum IsNull {
/// The value is NULL.
Yes,
Expand Down
2 changes: 1 addition & 1 deletion postgres/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::thread;
use std::time::Duration;
use tokio_postgres::error::SqlState;
use tokio_postgres::types::Type;
use tokio_postgres::NoTls;
use tokio_postgres::{NoTls, RowDescription};

use super::*;
use crate::binary_copy::{BinaryCopyInWriter, BinaryCopyOutIter};
Expand Down
4 changes: 2 additions & 2 deletions tokio-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ pub use crate::error::Error;
pub use crate::generic_client::GenericClient;
pub use crate::portal::Portal;
pub use crate::query::RowStream;
pub use crate::row::{Row, SimpleQueryRow};
pub use crate::row::{Row, RowBuilder, SimpleQueryRow};
pub use crate::simple_query::SimpleQueryStream;
#[cfg(feature = "runtime")]
pub use crate::socket::Socket;
pub use crate::statement::{Column, Statement};
pub use crate::statement::{Column, RowDescription, Statement};
#[cfg(feature = "runtime")]
use crate::tls::MakeTlsConnect;
pub use crate::tls::NoTls;
Expand Down
7 changes: 4 additions & 3 deletions tokio-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use postgres_protocol::message::frontend;
use std::fmt;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);
Expand Down Expand Up @@ -50,7 +51,7 @@ where
};
let responses = start(client, buf).await?;
Ok(RowStream {
statement,
statement: Arc::new(statement),
responses,
_p: PhantomPinned,
})
Expand All @@ -70,7 +71,7 @@ pub async fn query_portal(
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;

Ok(RowStream {
statement: portal.statement().clone(),
statement: Arc::new(portal.statement().clone()),
responses,
_p: PhantomPinned,
})
Expand Down Expand Up @@ -200,7 +201,7 @@ where
pin_project! {
/// A stream of table rows.
pub struct RowStream {
statement: Statement,
statement: Arc<Statement>,
responses: Responses,
#[pin]
_p: PhantomPinned,
Expand Down
130 changes: 124 additions & 6 deletions tokio-postgres/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
use crate::row::sealed::{AsName, Sealed};
use crate::simple_query::SimpleColumn;
use crate::statement::Column;
use crate::statement::{Column, RowDescription};
use crate::types::{FromSql, Type, WrongType};
use crate::{Error, Statement};
use crate::Error;
use bytes::{BufMut, BytesMut};
use fallible_iterator::FallibleIterator;
use postgres_protocol::message::backend::DataRowBody;
use postgres_types::{IsNull, ToSql};
use std::fmt;
use std::ops::Range;
use std::str;
Expand Down Expand Up @@ -96,7 +98,7 @@ where

/// A row of data returned from the database by a query.
pub struct Row {
statement: Statement,
description: Arc<dyn RowDescription>,
body: DataRowBody,
ranges: Vec<Option<Range<usize>>>,
}
Expand All @@ -110,18 +112,26 @@ impl fmt::Debug for Row {
}

impl Row {
pub(crate) fn new(statement: Statement, body: DataRowBody) -> Result<Row, Error> {
pub(crate) fn new(
description: Arc<dyn RowDescription>,
body: DataRowBody,
) -> Result<Row, Error> {
let ranges = body.ranges().collect().map_err(Error::parse)?;
Ok(Row {
statement,
description,
body,
ranges,
})
}

/// Returns description about the data in the row.
pub fn description(&self) -> Arc<dyn RowDescription> {
self.description.clone()
}

/// Returns information about the columns of data in the row.
pub fn columns(&self) -> &[Column] {
self.statement.columns()
self.description.columns()
}

/// Determines if the row contains no values.
Expand Down Expand Up @@ -270,3 +280,111 @@ impl SimpleQueryRow {
FromSql::from_sql_nullable(&Type::TEXT, buf).map_err(|e| Error::from_sql(e, idx))
}
}
/// Builder for building a [`Row`].
pub struct RowBuilder {
desc: Arc<dyn RowDescription>,
buf: BytesMut,
n: usize,
}

impl RowBuilder {
/// Creates a new builder using the provided row description.
pub fn new(desc: Arc<dyn RowDescription>) -> Self {
Self {
desc,
buf: BytesMut::new(),
n: 0,
}
}

/// Appends a column's value and returns a value indicates if this value should be represented
/// as NULL.
pub fn push(&mut self, value: Option<impl ToSql>) -> Result<IsNull, Error> {
let columns = self.desc.columns();

if columns.len() == self.n {
return Err(Error::column(
"exceeded expected number of columns".to_string(),
));
}

let db_type = columns[self.n].type_();
let start = self.buf.len();

// Reserve 4 bytes for the length of the binary data to be written
self.buf.put_i32(-1i32);

let is_null = value
.to_sql(db_type, &mut self.buf)
.map_err(|e| Error::to_sql(e, self.n))?;

// Calculate the length of data just written.
if is_null == IsNull::No {
let len = (self.buf.len() - start - 4) as i32;
// Update the length of data
self.buf[start..start + 4].copy_from_slice(&len.to_be_bytes());
};

self.n += 1;
Ok(is_null)
}

/// Builds the row.
pub fn build(self) -> Result<Row, Error> {
Row::new(
self.desc.clone(),
DataRowBody::new(self.buf.freeze(), self.n as u16),
)
}
}

#[cfg(test)]
mod tests {
use postgres_types::IsNull;

use super::*;
use std::net::IpAddr;

struct TestRowDescription {
columns: Vec<Column>,
}

impl RowDescription for TestRowDescription {
fn columns(&self) -> &[Column] {
&self.columns
}
}

#[test]
fn test_row_builder() {
let mut builder = RowBuilder::new(Arc::new(TestRowDescription {
columns: vec![
Column::new("id".to_string(), Type::INT8),
Column::new("name".to_string(), Type::VARCHAR),
Column::new("ip".to_string(), Type::INET),
],
}));

let expected_id = 1234i64;
let is_null = builder.push(Some(expected_id)).unwrap();
assert_eq!(IsNull::No, is_null);

let expected_name = "row builder";
let is_null = builder.push(Some(expected_name)).unwrap();
assert_eq!(IsNull::No, is_null);

let is_null = builder.push(None::<IpAddr>).unwrap();
assert_eq!(IsNull::Yes, is_null);

let row = builder.build().unwrap();

let actual_id: i64 = row.try_get("id").unwrap();
assert_eq!(expected_id, actual_id);

let actual_name: String = row.try_get("name").unwrap();
assert_eq!(expected_name, actual_name);

let actual_dt: Option<IpAddr> = row.try_get("ip").unwrap();
assert_eq!(None, actual_dt);
}
}
20 changes: 17 additions & 3 deletions tokio-postgres/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ use std::{
sync::{Arc, Weak},
};

/// Describes the data (columns) in a row.
pub trait RowDescription: Sync + Send {
/// Returns information about the columns returned when the statement is queried.
fn columns(&self) -> &[Column];
}

struct StatementInner {
client: Weak<InnerClient>,
name: String,
Expand Down Expand Up @@ -57,9 +63,16 @@ impl Statement {
pub fn params(&self) -> &[Type] {
&self.0.params
}
}

/// Returns information about the columns returned when the statement is queried.
pub fn columns(&self) -> &[Column] {
impl RowDescription for Statement {
fn columns(&self) -> &[Column] {
&self.0.columns
}
}

impl RowDescription for Arc<Statement> {
fn columns(&self) -> &[Column] {
&self.0.columns
}
}
Expand All @@ -71,7 +84,8 @@ pub struct Column {
}

impl Column {
pub(crate) fn new(name: String, type_: Type) -> Column {
/// Constructs a new column.
pub fn new(name: String, type_: Type) -> Column {
Column { name, type_ }
}

Expand Down
3 changes: 2 additions & 1 deletion tokio-postgres/tests/test/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use tokio_postgres::error::SqlState;
use tokio_postgres::tls::{NoTls, NoTlsStream};
use tokio_postgres::types::{Kind, Type};
use tokio_postgres::{
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, SimpleQueryMessage,
AsyncMessage, Client, Config, Connection, Error, IsolationLevel, RowDescription,
SimpleQueryMessage,
};

mod binary_copy;
Expand Down

0 comments on commit f712d07

Please sign in to comment.