From 3e55df0bad24f7d80d5b732374c536a69de3deb2 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Tue, 10 Dec 2024 18:45:21 +0000 Subject: [PATCH 01/23] feat: configurable caching with support for tracing --- quaint/src/connector/metrics.rs | 29 +- quaint/src/connector/postgres/native/cache.rs | 185 +++++++++++ quaint/src/connector/postgres/native/mod.rs | 300 ++++++++---------- quaint/src/connector/postgres/native/query.rs | 60 ++++ quaint/src/connector/postgres/url.rs | 6 +- quaint/src/pooled.rs | 20 +- quaint/src/pooled/manager.rs | 19 +- quaint/src/single.rs | 2 +- 8 files changed, 420 insertions(+), 201 deletions(-) create mode 100644 quaint/src/connector/postgres/native/cache.rs create mode 100644 quaint/src/connector/postgres/native/query.rs diff --git a/quaint/src/connector/metrics.rs b/quaint/src/connector/metrics.rs index 37143866a67d..158027efe634 100644 --- a/quaint/src/connector/metrics.rs +++ b/quaint/src/connector/metrics.rs @@ -106,20 +106,19 @@ struct QueryForTracing<'a>(&'a str); impl fmt::Display for QueryForTracing<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let query = self - .0 - .split_once("/* traceparent=") - .map_or(self.0, |(str, remainder)| { - if remainder - .split_once("*/") - .is_some_and(|(_, suffix)| suffix.trim_end().is_empty()) - { - str - } else { - self.0 - } - }) - .trim(); - write!(f, "{query}") + write!(f, "{}", strip_query_traceparent(self.0)) } } + +pub(super) fn strip_query_traceparent(query: &str) -> &str { + query.split_once("/* traceparent=").map_or(query, |(str, remainder)| { + if remainder + .split_once("*/") + .is_some_and(|(_, suffix)| suffix.trim_end().is_empty()) + { + str.trim_end() + } else { + query + } + }) +} diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs new file mode 100644 index 000000000000..dc2dce5ee508 --- /dev/null +++ b/quaint/src/connector/postgres/native/cache.rs @@ -0,0 +1,185 @@ +use std::hash::{BuildHasher, Hash, RandomState}; + +use async_trait::async_trait; +use futures::lock::Mutex; +use lru_cache::LruCache; +use postgres_types::Type; +use tokio_postgres::{Client, Error, Statement}; + +use crate::connector::metrics::strip_query_traceparent; + +use super::query::{IsQuery, TypedQuery}; + +#[async_trait] +pub trait QueryCache: From + Send + Sync { + type Query: IsQuery; + + async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result; +} + +#[derive(Debug, Default)] +pub struct NoopPreparedStatementCache; + +#[async_trait] +impl QueryCache for NoopPreparedStatementCache { + type Query = Statement; + + #[inline] + async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + client.prepare_typed(sql, types).await + } +} + +impl From for NoopPreparedStatementCache { + fn from(_: CacheSettings) -> Self { + Self::default() + } +} + +#[derive(Debug)] +pub struct LruPreparedStatementCache { + cache: InnerLruCache, +} + +impl LruPreparedStatementCache { + pub fn with_capacity(capacity: usize) -> Self { + Self { + cache: InnerLruCache::with_capacity(capacity), + } + } +} + +#[async_trait] +impl QueryCache for LruPreparedStatementCache { + type Query = Statement; + + async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + match self.cache.get(sql, types).await { + Some(statement) => Ok(statement), + None => { + let stmt = client.prepare_typed(sql, types).await?; + self.cache.insert(sql, types, stmt.clone()).await; + Ok(stmt) + } + } + } +} + +impl From for LruPreparedStatementCache { + fn from(settings: CacheSettings) -> Self { + Self::with_capacity(settings.capacity) + } +} + +#[derive(Debug)] +pub struct LruTracingCache { + cache: InnerLruCache, +} + +impl LruTracingCache { + pub fn with_capacity(capacity: usize) -> Self { + Self { + cache: InnerLruCache::with_capacity(capacity), + } + } +} + +#[async_trait] +impl QueryCache for LruTracingCache { + type Query = TypedQuery; + + async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + let sql_without_traceparent = strip_query_traceparent(sql); + + match self.cache.get(sql_without_traceparent, types).await { + Some(query) => Ok(query), + None => { + let stmt = client.prepare_typed(sql, types).await?; + let query = TypedQuery { + sql: sql.into(), + types: stmt.columns().iter().map(|c| c.type_().clone()).collect(), + }; + self.cache.insert(sql_without_traceparent, types, query.clone()).await; + Ok(query) + } + } + } +} + +impl From for LruTracingCache { + fn from(settings: CacheSettings) -> Self { + Self::with_capacity(settings.capacity) + } +} + +#[derive(Debug)] +pub struct CacheSettings { + pub capacity: usize, +} + +/// Key uniquely representing an SQL statement in the prepared statements cache. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct QueryKey { + /// Hash of a string with SQL query. + sql: u64, + /// Combined hash of types for all parameters from the query. + types_hash: u64, +} + +impl QueryKey { + fn new(sql: &str, params: &[Type]) -> Self { + let st = RandomState::new(); + Self { + sql: st.hash_one(sql), + types_hash: st.hash_one(params), + } + } +} + +#[derive(Debug)] +struct InnerLruCache { + cache: Mutex>, +} + +impl InnerLruCache { + fn with_capacity(capacity: usize) -> Self { + Self { + cache: Mutex::new(LruCache::new(capacity)), + } + } + + async fn get(&self, sql: &str, types: &[Type]) -> Option + where + V: Clone, + { + let mut cache = self.cache.lock().await; + let capacity = cache.capacity(); + let stored = cache.len(); + + let key = QueryKey::new(sql, types); + match cache.get_mut(&key) { + Some(value) => { + tracing::trace!( + message = "CACHE HIT!", + query = sql, + capacity = capacity, + stored = stored, + ); + Some(value.clone()) + } + None => { + tracing::trace!( + message = "CACHE MISS!", + query = sql, + capacity = capacity, + stored = stored, + ); + None + } + } + } + + pub async fn insert(&self, sql: &str, types: &[Type], value: V) { + self.cache.lock().await.insert(QueryKey::new(sql, types), value); + } +} diff --git a/quaint/src/connector/postgres/native/mod.rs b/quaint/src/connector/postgres/native/mod.rs index eb6618ce9dc7..2b565d3c5b0d 100644 --- a/quaint/src/connector/postgres/native/mod.rs +++ b/quaint/src/connector/postgres/native/mod.rs @@ -1,19 +1,23 @@ //! Definitions for the Postgres connector. //! This module is not compatible with wasm32-* targets. //! This module is only available with the `postgresql-native` feature. +mod cache; pub(crate) mod column_type; mod conversion; mod error; mod explain; +mod query; mod websocket; pub(crate) use crate::connector::postgres::url::PostgresNativeUrl; use crate::connector::postgres::url::{Hidden, SslAcceptMode, SslParams}; use crate::connector::{ timeout, ColumnType, DescribedColumn, DescribedParameter, DescribedQuery, IsolationLevel, Transaction, + TransactionOptions, }; use crate::error::NativeErrorKind; +use crate::prelude::DefaultTransaction; use crate::ValueType; use crate::{ ast::{Query, Value}, @@ -22,14 +26,15 @@ use crate::{ visitor::{self, Visitor}, }; use async_trait::async_trait; +use cache::{CacheSettings, LruPreparedStatementCache, LruTracingCache, NoopPreparedStatementCache, QueryCache}; use column_type::PGColumnType; -use futures::{future::FutureExt, lock::Mutex}; -use lru_cache::LruCache; +use futures::future::FutureExt; +use futures::StreamExt; use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use postgres_types::{Kind as PostgresKind, Type as PostgresType}; use prisma_metrics::WithMetricsInstrumentation; -use std::hash::{DefaultHasher, Hash, Hasher}; +use query::IsQuery; use std::{ fmt::{Debug, Display}, fs, @@ -49,7 +54,7 @@ pub use tokio_postgres; use super::PostgresWebSocketUrl; -struct PostgresClient(Client); +pub(super) struct PostgresClient(Client); impl Debug for PostgresClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -62,46 +67,19 @@ const DB_SYSTEM_NAME_COCKROACHDB: &str = "cockroachdb"; /// A connector interface for the PostgreSQL database. #[derive(Debug)] -pub struct PostgreSql { +pub struct PostgreSql { client: PostgresClient, pg_bouncer: bool, socket_timeout: Option, - statement_cache: Mutex, + queries_cache: QueriesCache, + stmts_cache: StmtsCache, is_healthy: AtomicBool, is_cockroachdb: bool, is_materialize: bool, db_system_name: &'static str, } -/// Key uniquely representing an SQL statement in the prepared statements cache. -#[derive(PartialEq, Eq, Hash)] -pub(crate) struct StatementKey { - /// Hash of a string with SQL query. - sql: u64, - /// Combined hash of types for all parameters from the query. - types_hash: u64, -} - -pub(crate) type StatementCache = LruCache; - -impl StatementKey { - fn new(sql: &str, params: &[Value<'_>]) -> Self { - Self { - sql: { - let mut hasher = DefaultHasher::new(); - sql.hash(&mut hasher); - hasher.finish() - }, - types_hash: { - let mut hasher = DefaultHasher::new(); - for param in params { - std::mem::discriminant(¶m.typed).hash(&mut hasher); - } - hasher.finish() - }, - } - } -} +pub type PostgreSqlForTracing = PostgreSql; #[derive(Debug)] struct SslAuth { @@ -171,11 +149,13 @@ impl SslParams { } impl PostgresNativeUrl { - pub(crate) fn cache(&self) -> StatementCache { + pub(crate) fn cache_settings(&self) -> CacheSettings { if self.query_params.pg_bouncer { - StatementCache::new(0) + CacheSettings { capacity: 0 } } else { - StatementCache::new(self.query_params.statement_cache_size) + CacheSettings { + capacity: self.query_params.statement_cache_size, + } } } @@ -236,7 +216,30 @@ impl PostgresNativeUrl { } } -impl PostgreSql { +impl PostgreSql { + /// Create a new websocket connection to managed database + pub async fn new_with_websocket(url: PostgresWebSocketUrl) -> crate::Result { + let client = connect_via_websocket(url).await?; + + Ok(Self { + client: PostgresClient(client), + socket_timeout: None, + pg_bouncer: false, + queries_cache: LruPreparedStatementCache::with_capacity(0), + stmts_cache: LruPreparedStatementCache::with_capacity(0), + is_healthy: AtomicBool::new(true), + is_cockroachdb: false, + is_materialize: false, + db_system_name: DB_SYSTEM_NAME_POSTGRESQL, + }) + } +} + +impl PostgreSql +where + QueriesCache: QueryCache, + StmtsCache: QueryCache, +{ /// Create a new connection to the database. pub async fn new(url: PostgresNativeUrl, tls_manager: &MakeTlsConnectorManager) -> crate::Result { let config = url.to_config(); @@ -287,7 +290,8 @@ impl PostgreSql { client: PostgresClient(client), socket_timeout: url.query_params.socket_timeout, pg_bouncer: url.query_params.pg_bouncer, - statement_cache: Mutex::new(url.cache()), + queries_cache: url.cache_settings().into(), + stmts_cache: url.cache_settings().into(), is_healthy: AtomicBool::new(true), is_cockroachdb, is_materialize, @@ -295,22 +299,6 @@ impl PostgreSql { }) } - /// Create a new websocket connection to managed database - pub async fn new_with_websocket(url: PostgresWebSocketUrl) -> crate::Result { - let client = connect_via_websocket(url).await?; - - Ok(Self { - client: PostgresClient(client), - socket_timeout: None, - pg_bouncer: false, - statement_cache: Mutex::new(StatementCache::new(0)), - is_healthy: AtomicBool::new(true), - is_cockroachdb: false, - is_materialize: false, - db_system_name: DB_SYSTEM_NAME_POSTGRESQL, - }) - } - /// The underlying tokio_postgres::Client. Only available with the /// `expose-drivers` Cargo feature. This is a lower level API when you need /// to get into database specific features. @@ -319,41 +307,6 @@ impl PostgreSql { &self.client.0 } - async fn fetch_cached(&self, sql: &str, params: &[Value<'_>]) -> crate::Result { - let statement_key = StatementKey::new(sql, params); - let mut cache = self.statement_cache.lock().await; - let capacity = cache.capacity(); - let stored = cache.len(); - - match cache.get_mut(&statement_key) { - Some(stmt) => { - tracing::trace!( - message = "CACHE HIT!", - query = sql, - capacity = capacity, - stored = stored, - ); - - Ok(stmt.clone()) // arc'd - } - None => { - tracing::trace!( - message = "CACHE MISS!", - query = sql, - capacity = capacity, - stored = stored, - ); - - let param_types = conversion::params_to_types(params); - let stmt = self.perform_io(self.client.0.prepare_typed(sql, ¶m_types)).await?; - - cache.insert(statement_key, stmt.clone()); - - Ok(stmt) - } - } - } - async fn perform_io(&self, fut: F) -> crate::Result where F: Future>, @@ -487,6 +440,59 @@ impl PostgreSql { Ok(nullables) } + + async fn query_raw_impl( + &self, + sql: &str, + params: &[Value<'_>], + types: &[PostgresType], + ) -> crate::Result { + self.check_bind_variables_len(params)?; + + metrics::query( + "postgres.query_raw", + self.db_system_name, + sql, + params, + move || async move { + let query = self.queries_cache.get_by_query(&self.client.0, sql, types).await?; + + if query.params().len() != params.len() { + let kind = ErrorKind::IncorrectNumberOfParameters { + expected: query.params().len(), + actual: params.len(), + }; + + return Err(Error::builder(kind).build()); + } + + let mut rows = Box::pin( + self.perform_io(query.dispatch(&self.client.0, conversion::conv_params(params))) + .await?, + ); + + if let Some(first) = rows.next().await { + let first = first?; + let types = first + .columns() + .iter() + .map(|c| PGColumnType::from_pg_type(c.type_())) + .map(ColumnType::from) + .collect::>(); + let names = first.columns().iter().map(|c| c.name().to_string()).collect::>(); + let mut result = ResultSet::new(names, types, vec![first.get_result_row()?]); + + while let Some(row) = rows.next().await { + result.rows.push(row?.get_result_row()?); + } + return Ok(result); + } + + Ok(ResultSet::new(vec![], vec![], vec![])) + }, + ) + .await + } } // A SearchPath connection parameter (Display-impl) for connection initialization. @@ -526,10 +532,30 @@ impl Display for SetSearchPath<'_> { } } -impl_default_TransactionCapable!(PostgreSql); +#[async_trait] +impl TransactionCapable for PostgreSql +where + QueriesCache: QueryCache, + StmtsCache: QueryCache, +{ + async fn start_transaction<'a>( + &'a self, + isolation: Option, + ) -> crate::Result> { + let opts = TransactionOptions::new(isolation, self.requires_isolation_first()); + + Ok(Box::new( + DefaultTransaction::new(self, self.begin_statement(), opts).await?, + )) + } +} #[async_trait] -impl Queryable for PostgreSql { +impl Queryable for PostgreSql +where + QueriesCache: QueryCache, + StmtsCache: QueryCache, +{ async fn query(&self, q: Query<'_>) -> crate::Result { let (sql, params) = visitor::Postgres::build(q)?; @@ -537,91 +563,16 @@ impl Queryable for PostgreSql { } async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> crate::Result { - self.check_bind_variables_len(params)?; - - metrics::query( - "postgres.query_raw", - self.db_system_name, - sql, - params, - move || async move { - let stmt = self.fetch_cached(sql, &[]).await?; - - if stmt.params().len() != params.len() { - let kind = ErrorKind::IncorrectNumberOfParameters { - expected: stmt.params().len(), - actual: params.len(), - }; - - return Err(Error::builder(kind).build()); - } - - let rows = self - .perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice())) - .await?; - - let col_types = stmt - .columns() - .iter() - .map(|c| PGColumnType::from_pg_type(c.type_())) - .map(ColumnType::from) - .collect::>(); - let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new()); - - for row in rows { - result.rows.push(row.get_result_row()?); - } - - Ok(result) - }, - ) - .await + self.query_raw_impl(sql, params, &[]).await } async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> crate::Result { - self.check_bind_variables_len(params)?; - - metrics::query( - "postgres.query_raw", - self.db_system_name, - sql, - params, - move || async move { - let stmt = self.fetch_cached(sql, params).await?; - - if stmt.params().len() != params.len() { - let kind = ErrorKind::IncorrectNumberOfParameters { - expected: stmt.params().len(), - actual: params.len(), - }; - - return Err(Error::builder(kind).build()); - } - - let col_types = stmt - .columns() - .iter() - .map(|c| PGColumnType::from_pg_type(c.type_())) - .map(ColumnType::from) - .collect::>(); - let rows = self - .perform_io(self.client.0.query(&stmt, conversion::conv_params(params).as_slice())) - .await?; - - let mut result = ResultSet::new(stmt.to_column_names(), col_types, Vec::new()); - - for row in rows { - result.rows.push(row.get_result_row()?); - } - - Ok(result) - }, - ) - .await + self.query_raw_impl(sql, params, &conversion::params_to_types(params)) + .await } async fn describe_query(&self, sql: &str) -> crate::Result { - let stmt = self.fetch_cached(sql, &[]).await?; + let stmt = self.stmts_cache.get_by_query(&self.client.0, sql, &[]).await?; let mut columns: Vec = Vec::with_capacity(stmt.columns().len()); let mut parameters: Vec = Vec::with_capacity(stmt.params().len()); @@ -710,7 +661,7 @@ impl Queryable for PostgreSql { sql, params, move || async move { - let stmt = self.fetch_cached(sql, &[]).await?; + let stmt = self.stmts_cache.get_by_query(&self.client.0, sql, &[]).await?; if stmt.params().len() != params.len() { let kind = ErrorKind::IncorrectNumberOfParameters { @@ -740,7 +691,8 @@ impl Queryable for PostgreSql { sql, params, move || async move { - let stmt = self.fetch_cached(sql, params).await?; + let types = conversion::params_to_types(params); + let stmt = self.stmts_cache.get_by_query(&self.client.0, sql, &types).await?; if stmt.params().len() != params.len() { let kind = ErrorKind::IncorrectNumberOfParameters { diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs new file mode 100644 index 000000000000..cc4d5704b380 --- /dev/null +++ b/quaint/src/connector/postgres/native/query.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use postgres_types::{BorrowToSql, Type}; +use tokio_postgres::{Client, Error, RowStream, Statement}; + +#[async_trait] +pub trait IsQuery: Send { + fn params(&self) -> &[Type]; + + async fn dispatch(&self, client: &Client, args: Args) -> Result + where + Args: IntoIterator + Send, + Args::Item: BorrowToSql, + Args::IntoIter: ExactSizeIterator + Send; +} + +#[async_trait] +impl IsQuery for Statement { + #[inline] + fn params(&self) -> &[Type] { + self.params() + } + + #[inline] + async fn dispatch(&self, client: &Client, args: Args) -> Result + where + Args: IntoIterator + Send, + Args::Item: BorrowToSql, + Args::IntoIter: ExactSizeIterator + Send, + { + client.query_raw(self, args).await + } +} + +#[derive(Debug, Clone)] +pub struct TypedQuery { + pub(super) sql: Arc, + pub(super) types: Arc<[Type]>, +} + +#[async_trait] +impl IsQuery for TypedQuery { + #[inline] + fn params(&self) -> &[Type] { + &self.types + } + + #[inline] + async fn dispatch(&self, client: &Client, args: Args) -> Result + where + Args: IntoIterator + Send, + Args::Item: BorrowToSql, + Args::IntoIter: ExactSizeIterator + Send, + { + client + .query_typed_raw(&self.sql, args.into_iter().zip(self.types.iter().cloned())) + .await + } +} diff --git a/quaint/src/connector/postgres/url.rs b/quaint/src/connector/postgres/url.rs index 096484cdc87a..19dae0066be9 100644 --- a/quaint/src/connector/postgres/url.rs +++ b/quaint/src/connector/postgres/url.rs @@ -559,13 +559,13 @@ mod tests { let url = PostgresNativeUrl::new(Url::parse("postgresql:///localhost:5432/foo?statement_cache_size=420").unwrap()) .unwrap(); - assert_eq!(420, url.cache().capacity()); + assert_eq!(420, url.cache_settings().capacity); } #[test] fn should_have_default_cache_size() { let url = PostgresNativeUrl::new(Url::parse("postgresql:///localhost:5432/foo").unwrap()).unwrap(); - assert_eq!(100, url.cache().capacity()); + assert_eq!(100, url.cache_settings().capacity); } #[test] @@ -598,7 +598,7 @@ mod tests { fn should_not_enable_caching_with_pgbouncer() { let url = PostgresNativeUrl::new(Url::parse("postgresql:///localhost:5432/foo?pgbouncer=true").unwrap()).unwrap(); - assert_eq!(0, url.cache().capacity()); + assert_eq!(0, url.cache_settings().capacity()); } #[test] diff --git a/quaint/src/pooled.rs b/quaint/src/pooled.rs index 389005ab7bd3..45fb4bb06338 100644 --- a/quaint/src/pooled.rs +++ b/quaint/src/pooled.rs @@ -355,11 +355,7 @@ impl Builder { } impl Quaint { - /// Creates a new builder for a Quaint connection pool with the given - /// connection string. See the [module level documentation] for details. - /// - /// [module level documentation]: index.html - pub fn builder(url_str: &str) -> crate::Result { + pub fn builder_with_tracing(url_str: &str, is_tracing_enabled: bool) -> crate::Result { match url_str { #[cfg(feature = "sqlite")] s if s.starts_with("file") => { @@ -424,7 +420,11 @@ impl Quaint { let max_idle_connection_lifetime = url.max_idle_connection_lifetime(); let tls_manager = crate::connector::MakeTlsConnectorManager::new(url.clone()); - let manager = QuaintManager::Postgres { url, tls_manager }; + let manager = QuaintManager::Postgres { + url, + tls_manager, + is_tracing_enabled, + }; let mut builder = Builder::new(s, manager)?; if let Some(limit) = connection_limit { @@ -478,6 +478,14 @@ impl Quaint { } } + /// Creates a new builder for a Quaint connection pool with the given + /// connection string. See the [module level documentation] for details. + /// + /// [module level documentation]: index.html + pub fn builder(url_str: &str) -> crate::Result { + Self::builder_with_tracing(url_str, false) + } + /// The number of connections in the pool. pub async fn capacity(&self) -> u32 { self.inner.state().await.max_open as u32 diff --git a/quaint/src/pooled/manager.rs b/quaint/src/pooled/manager.rs index 8a9715640579..5d93d1be58bc 100644 --- a/quaint/src/pooled/manager.rs +++ b/quaint/src/pooled/manager.rs @@ -101,6 +101,7 @@ pub enum QuaintManager { Postgres { url: PostgresNativeUrl, tls_manager: MakeTlsConnectorManager, + is_tracing_enabled: bool, }, #[cfg(feature = "sqlite")] @@ -133,9 +134,23 @@ impl Manager for QuaintManager { } #[cfg(feature = "postgresql-native")] - QuaintManager::Postgres { url, tls_manager } => { + QuaintManager::Postgres { + url, + tls_manager, + is_tracing_enabled: false, + } => { use crate::connector::PostgreSql; - Ok(Box::new(PostgreSql::new(url.clone(), tls_manager).await?) as Self::Connection) + Ok(Box::new(::new(url.clone(), tls_manager).await?) as Self::Connection) + } + + #[cfg(feature = "postgresql-native")] + QuaintManager::Postgres { + url, + tls_manager, + is_tracing_enabled: true, + } => { + use crate::connector::PostgreSqlForTracing; + Ok(Box::new(PostgreSqlForTracing::new(url.clone(), tls_manager).await?) as Self::Connection) } #[cfg(feature = "mssql-native")] diff --git a/quaint/src/single.rs b/quaint/src/single.rs index 004b84e0da54..9743d69368a8 100644 --- a/quaint/src/single.rs +++ b/quaint/src/single.rs @@ -158,7 +158,7 @@ impl Quaint { s if s.starts_with("postgres") || s.starts_with("postgresql") => { let url = connector::PostgresNativeUrl::new(url::Url::parse(s)?)?; let tls_manager = connector::MakeTlsConnectorManager::new(url.clone()); - let psql = connector::PostgreSql::new(url, &tls_manager).await?; + let psql = ::new(url, &tls_manager).await?; Arc::new(psql) as Arc } #[cfg(feature = "mssql-native")] From 64e88096f6681edfba16e118f4f6452e4d04507f Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Tue, 10 Dec 2024 18:52:50 +0000 Subject: [PATCH 02/23] fix: correct TypedQuery creation --- quaint/src/connector/postgres/native/cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index dc2dce5ee508..9c008ba7992c 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -97,7 +97,7 @@ impl QueryCache for LruTracingCache { let stmt = client.prepare_typed(sql, types).await?; let query = TypedQuery { sql: sql.into(), - types: stmt.columns().iter().map(|c| c.type_().clone()).collect(), + types: stmt.params().iter().cloned().collect(), }; self.cache.insert(sql_without_traceparent, types, query.clone()).await; Ok(query) From 8ac2709de9b9e134d61eb3805e6405996a3e01ec Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Tue, 10 Dec 2024 19:13:59 +0000 Subject: [PATCH 03/23] fix: store columns in TypedQuery --- quaint/src/connector/postgres/native/cache.rs | 5 ++-- quaint/src/connector/postgres/native/mod.rs | 28 ++++++++----------- quaint/src/connector/postgres/native/query.rs | 28 ++++++++++++------- 3 files changed, 32 insertions(+), 29 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 9c008ba7992c..413d036e7cc1 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -32,7 +32,7 @@ impl QueryCache for NoopPreparedStatementCache { impl From for NoopPreparedStatementCache { fn from(_: CacheSettings) -> Self { - Self::default() + Self } } @@ -97,7 +97,8 @@ impl QueryCache for LruTracingCache { let stmt = client.prepare_typed(sql, types).await?; let query = TypedQuery { sql: sql.into(), - types: stmt.params().iter().cloned().collect(), + params: stmt.params().iter().cloned().collect(), + columns: stmt.columns().iter().map(|c| c.type_().clone()).collect(), }; self.cache.insert(sql_without_traceparent, types, query.clone()).await; Ok(query) diff --git a/quaint/src/connector/postgres/native/mod.rs b/quaint/src/connector/postgres/native/mod.rs index 2b565d3c5b0d..2099810c0b2f 100644 --- a/quaint/src/connector/postgres/native/mod.rs +++ b/quaint/src/connector/postgres/native/mod.rs @@ -471,24 +471,18 @@ where .await?, ); - if let Some(first) = rows.next().await { - let first = first?; - let types = first - .columns() - .iter() - .map(|c| PGColumnType::from_pg_type(c.type_())) - .map(ColumnType::from) - .collect::>(); - let names = first.columns().iter().map(|c| c.name().to_string()).collect::>(); - let mut result = ResultSet::new(names, types, vec![first.get_result_row()?]); - - while let Some(row) = rows.next().await { - result.rows.push(row?.get_result_row()?); - } - return Ok(result); + let types = query + .columns() + .map(|c| PGColumnType::from_pg_type(&c)) + .map(ColumnType::from) + .collect::>(); + let names = query.columns().map(|c| c.name().to_string()).collect::>(); + let mut result = ResultSet::new(names, types, Vec::new()); + + while let Some(row) = rows.next().await { + result.rows.push(row?.get_result_row()?); } - - Ok(ResultSet::new(vec![], vec![], vec![])) + Ok(result) }, ) .await diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index cc4d5704b380..21767940eb18 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -2,11 +2,12 @@ use std::sync::Arc; use async_trait::async_trait; use postgres_types::{BorrowToSql, Type}; -use tokio_postgres::{Client, Error, RowStream, Statement}; +use tokio_postgres::{Client, Column, Error, RowStream, Statement}; #[async_trait] pub trait IsQuery: Send { - fn params(&self) -> &[Type]; + fn params(&self) -> impl ExactSizeIterator + '_; + fn columns(&self) -> impl ExactSizeIterator + '_; async fn dispatch(&self, client: &Client, args: Args) -> Result where @@ -17,9 +18,12 @@ pub trait IsQuery: Send { #[async_trait] impl IsQuery for Statement { - #[inline] - fn params(&self) -> &[Type] { - self.params() + fn params(&self) -> impl ExactSizeIterator + '_ { + self.params().iter().cloned() + } + + fn columns(&self) -> impl ExactSizeIterator + '_ { + self.columns().iter().map(Column::type_).cloned() } #[inline] @@ -36,14 +40,18 @@ impl IsQuery for Statement { #[derive(Debug, Clone)] pub struct TypedQuery { pub(super) sql: Arc, - pub(super) types: Arc<[Type]>, + pub(super) params: Arc<[Type]>, + pub(super) columns: Arc<[Type]>, } #[async_trait] impl IsQuery for TypedQuery { - #[inline] - fn params(&self) -> &[Type] { - &self.types + fn params(&self) -> impl ExactSizeIterator + '_ { + self.params.iter().cloned() + } + + fn columns(&self) -> impl ExactSizeIterator + '_ { + self.columns.iter().cloned() } #[inline] @@ -54,7 +62,7 @@ impl IsQuery for TypedQuery { Args::IntoIter: ExactSizeIterator + Send, { client - .query_typed_raw(&self.sql, args.into_iter().zip(self.types.iter().cloned())) + .query_typed_raw(&self.sql, args.into_iter().zip(self.params.iter().cloned())) .await } } From 853e9575d0b49dc3837ff5bd9eaaf57fa51eb80b Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Tue, 10 Dec 2024 20:44:20 +0000 Subject: [PATCH 04/23] fix: store column names as well --- quaint/src/connector/postgres/native/cache.rs | 20 +++--- quaint/src/connector/postgres/native/mod.rs | 10 +-- quaint/src/connector/postgres/native/query.rs | 66 ++++++++++++++----- 3 files changed, 67 insertions(+), 29 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 413d036e7cc1..6ea2ee52d12a 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -1,4 +1,7 @@ -use std::hash::{BuildHasher, Hash, RandomState}; +use std::{ + hash::{BuildHasher, Hash, RandomState}, + sync::Arc, +}; use async_trait::async_trait; use futures::lock::Mutex; @@ -73,7 +76,7 @@ impl From for LruPreparedStatementCache { #[derive(Debug)] pub struct LruTracingCache { - cache: InnerLruCache, + cache: InnerLruCache>, } impl LruTracingCache { @@ -86,20 +89,21 @@ impl LruTracingCache { #[async_trait] impl QueryCache for LruTracingCache { - type Query = TypedQuery; + type Query = Arc; - async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result, Error> { let sql_without_traceparent = strip_query_traceparent(sql); match self.cache.get(sql_without_traceparent, types).await { Some(query) => Ok(query), None => { let stmt = client.prepare_typed(sql, types).await?; - let query = TypedQuery { + let query = Arc::new(TypedQuery { sql: sql.into(), - params: stmt.params().iter().cloned().collect(), - columns: stmt.columns().iter().map(|c| c.type_().clone()).collect(), - }; + param_types: stmt.params().to_vec(), + column_names: stmt.columns().iter().map(|c| c.name().to_owned()).collect(), + column_types: stmt.columns().iter().map(|c| c.type_().clone()).collect(), + }); self.cache.insert(sql_without_traceparent, types, query.clone()).await; Ok(query) } diff --git a/quaint/src/connector/postgres/native/mod.rs b/quaint/src/connector/postgres/native/mod.rs index 2099810c0b2f..ba2f401b0e87 100644 --- a/quaint/src/connector/postgres/native/mod.rs +++ b/quaint/src/connector/postgres/native/mod.rs @@ -457,9 +457,9 @@ where move || async move { let query = self.queries_cache.get_by_query(&self.client.0, sql, types).await?; - if query.params().len() != params.len() { + if query.param_types().len() != params.len() { let kind = ErrorKind::IncorrectNumberOfParameters { - expected: query.params().len(), + expected: query.param_types().len(), actual: params.len(), }; @@ -472,11 +472,11 @@ where ); let types = query - .columns() - .map(|c| PGColumnType::from_pg_type(&c)) + .column_types() + .map(PGColumnType::from_pg_type) .map(ColumnType::from) .collect::>(); - let names = query.columns().map(|c| c.name().to_string()).collect::>(); + let names = query.column_names().map(|name| name.to_string()).collect::>(); let mut result = ResultSet::new(names, types, Vec::new()); while let Some(row) = rows.next().await { diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index 21767940eb18..0ef507da4427 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -2,12 +2,13 @@ use std::sync::Arc; use async_trait::async_trait; use postgres_types::{BorrowToSql, Type}; -use tokio_postgres::{Client, Column, Error, RowStream, Statement}; +use tokio_postgres::{Client, Error, RowStream, Statement}; #[async_trait] pub trait IsQuery: Send { - fn params(&self) -> impl ExactSizeIterator + '_; - fn columns(&self) -> impl ExactSizeIterator + '_; + fn param_types(&self) -> impl ExactSizeIterator + '_; + fn column_names(&self) -> impl ExactSizeIterator + '_; + fn column_types(&self) -> impl ExactSizeIterator + '_; async fn dispatch(&self, client: &Client, args: Args) -> Result where @@ -18,12 +19,16 @@ pub trait IsQuery: Send { #[async_trait] impl IsQuery for Statement { - fn params(&self) -> impl ExactSizeIterator + '_ { - self.params().iter().cloned() + fn param_types(&self) -> impl ExactSizeIterator + '_ { + self.params().iter() } - fn columns(&self) -> impl ExactSizeIterator + '_ { - self.columns().iter().map(Column::type_).cloned() + fn column_names(&self) -> impl ExactSizeIterator + '_ { + self.columns().iter().map(|c| c.name()) + } + + fn column_types(&self) -> impl ExactSizeIterator + '_ { + self.columns().iter().map(|c| c.type_()) } #[inline] @@ -37,21 +42,26 @@ impl IsQuery for Statement { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct TypedQuery { - pub(super) sql: Arc, - pub(super) params: Arc<[Type]>, - pub(super) columns: Arc<[Type]>, + pub(super) sql: String, + pub(super) param_types: Vec, + pub(super) column_names: Vec, + pub(super) column_types: Vec, } #[async_trait] impl IsQuery for TypedQuery { - fn params(&self) -> impl ExactSizeIterator + '_ { - self.params.iter().cloned() + fn param_types(&self) -> impl ExactSizeIterator + '_ { + self.param_types.iter() } - fn columns(&self) -> impl ExactSizeIterator + '_ { - self.columns.iter().cloned() + fn column_names(&self) -> impl ExactSizeIterator + '_ { + self.column_names.iter().map(|s| s.as_str()) + } + + fn column_types(&self) -> impl ExactSizeIterator + '_ { + self.column_types.iter() } #[inline] @@ -62,7 +72,31 @@ impl IsQuery for TypedQuery { Args::IntoIter: ExactSizeIterator + Send, { client - .query_typed_raw(&self.sql, args.into_iter().zip(self.params.iter().cloned())) + .query_typed_raw(&self.sql, args.into_iter().zip(self.param_types.iter().cloned())) .await } } + +#[async_trait] +impl IsQuery for Arc { + fn param_types(&self) -> impl ExactSizeIterator + '_ { + self.as_ref().param_types() + } + + fn column_names(&self) -> impl ExactSizeIterator + '_ { + self.as_ref().column_names() + } + + fn column_types(&self) -> impl ExactSizeIterator + '_ { + self.as_ref().column_types() + } + + async fn dispatch(&self, client: &Client, args: Args) -> Result + where + Args: IntoIterator + Send, + Args::Item: BorrowToSql, + Args::IntoIter: ExactSizeIterator + Send, + { + self.as_ref().dispatch(client, args).await + } +} From 01f7b20c3f5ba39da2f906f5f37115e35115c94b Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 10:47:58 +0000 Subject: [PATCH 05/23] fix: define aliases and fix some references to PostgreSql --- quaint/src/connector/postgres/native/mod.rs | 22 ++++++++++--------- quaint/src/connector/postgres/url.rs | 2 +- quaint/src/pooled/manager.rs | 8 +++---- quaint/src/single.rs | 2 +- .../src/flavour/postgres/connection.rs | 4 ++-- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/quaint/src/connector/postgres/native/mod.rs b/quaint/src/connector/postgres/native/mod.rs index ba2f401b0e87..8f0ae1ca2a0e 100644 --- a/quaint/src/connector/postgres/native/mod.rs +++ b/quaint/src/connector/postgres/native/mod.rs @@ -67,7 +67,7 @@ const DB_SYSTEM_NAME_COCKROACHDB: &str = "cockroachdb"; /// A connector interface for the PostgreSQL database. #[derive(Debug)] -pub struct PostgreSql { +pub struct PostgreSql { client: PostgresClient, pg_bouncer: bool, socket_timeout: Option, @@ -79,7 +79,9 @@ pub struct PostgreSql; +pub type PostgreSqlWithDefaultCache = PostgreSql; +pub type PostgreSqlWithNoCache = PostgreSql; +pub type PostgreSqlWithTracingCache = PostgreSql; #[derive(Debug)] struct SslAuth { @@ -216,7 +218,7 @@ impl PostgresNativeUrl { } } -impl PostgreSql { +impl PostgreSqlWithNoCache { /// Create a new websocket connection to managed database pub async fn new_with_websocket(url: PostgresWebSocketUrl) -> crate::Result { let client = connect_via_websocket(url).await?; @@ -225,8 +227,8 @@ impl PostgreSql { client: PostgresClient(client), socket_timeout: None, pg_bouncer: false, - queries_cache: LruPreparedStatementCache::with_capacity(0), - stmts_cache: LruPreparedStatementCache::with_capacity(0), + queries_cache: NoopPreparedStatementCache, + stmts_cache: NoopPreparedStatementCache, is_healthy: AtomicBool::new(true), is_cockroachdb: false, is_materialize: false, @@ -958,7 +960,7 @@ mod tests { let tls_manager = MakeTlsConnectorManager::new(pg_url.clone()); - let client = PostgreSql::new(pg_url, &tls_manager).await.unwrap(); + let client = PostgreSqlWithDefaultCache::new(pg_url, &tls_manager).await.unwrap(); let result_set = client.query_raw("SHOW search_path", &[]).await.unwrap(); let row = result_set.first().unwrap(); @@ -1012,7 +1014,7 @@ mod tests { let tls_manager = MakeTlsConnectorManager::new(pg_url.clone()); - let client = PostgreSql::new(pg_url, &tls_manager).await.unwrap(); + let client = PostgreSqlWithDefaultCache::new(pg_url, &tls_manager).await.unwrap(); let result_set = client.query_raw("SHOW search_path", &[]).await.unwrap(); let row = result_set.first().unwrap(); @@ -1065,7 +1067,7 @@ mod tests { let tls_manager = MakeTlsConnectorManager::new(pg_url.clone()); - let client = PostgreSql::new(pg_url, &tls_manager).await.unwrap(); + let client = PostgreSqlWithDefaultCache::new(pg_url, &tls_manager).await.unwrap(); let result_set = client.query_raw("SHOW search_path", &[]).await.unwrap(); let row = result_set.first().unwrap(); @@ -1118,7 +1120,7 @@ mod tests { let tls_manager = MakeTlsConnectorManager::new(pg_url.clone()); - let client = PostgreSql::new(pg_url, &tls_manager).await.unwrap(); + let client = PostgreSqlWithDefaultCache::new(pg_url, &tls_manager).await.unwrap(); let result_set = client.query_raw("SHOW search_path", &[]).await.unwrap(); let row = result_set.first().unwrap(); @@ -1171,7 +1173,7 @@ mod tests { let tls_manager = MakeTlsConnectorManager::new(pg_url.clone()); - let client = PostgreSql::new(pg_url, &tls_manager).await.unwrap(); + let client = PostgreSqlWithDefaultCache::new(pg_url, &tls_manager).await.unwrap(); let result_set = client.query_raw("SHOW search_path", &[]).await.unwrap(); let row = result_set.first().unwrap(); diff --git a/quaint/src/connector/postgres/url.rs b/quaint/src/connector/postgres/url.rs index 19dae0066be9..c35c123b4395 100644 --- a/quaint/src/connector/postgres/url.rs +++ b/quaint/src/connector/postgres/url.rs @@ -598,7 +598,7 @@ mod tests { fn should_not_enable_caching_with_pgbouncer() { let url = PostgresNativeUrl::new(Url::parse("postgresql:///localhost:5432/foo?pgbouncer=true").unwrap()).unwrap(); - assert_eq!(0, url.cache_settings().capacity()); + assert_eq!(0, url.cache_settings().capacity); } #[test] diff --git a/quaint/src/pooled/manager.rs b/quaint/src/pooled/manager.rs index 5d93d1be58bc..4b13ca26bd24 100644 --- a/quaint/src/pooled/manager.rs +++ b/quaint/src/pooled/manager.rs @@ -139,8 +139,8 @@ impl Manager for QuaintManager { tls_manager, is_tracing_enabled: false, } => { - use crate::connector::PostgreSql; - Ok(Box::new(::new(url.clone(), tls_manager).await?) as Self::Connection) + use crate::connector::PostgreSqlWithDefaultCache; + Ok(Box::new(PostgreSqlWithDefaultCache::new(url.clone(), tls_manager).await?) as Self::Connection) } #[cfg(feature = "postgresql-native")] @@ -149,8 +149,8 @@ impl Manager for QuaintManager { tls_manager, is_tracing_enabled: true, } => { - use crate::connector::PostgreSqlForTracing; - Ok(Box::new(PostgreSqlForTracing::new(url.clone(), tls_manager).await?) as Self::Connection) + use crate::connector::PostgreSqlWithTracingCache; + Ok(Box::new(PostgreSqlWithTracingCache::new(url.clone(), tls_manager).await?) as Self::Connection) } #[cfg(feature = "mssql-native")] diff --git a/quaint/src/single.rs b/quaint/src/single.rs index 9743d69368a8..d4d48eff73dc 100644 --- a/quaint/src/single.rs +++ b/quaint/src/single.rs @@ -158,7 +158,7 @@ impl Quaint { s if s.starts_with("postgres") || s.starts_with("postgresql") => { let url = connector::PostgresNativeUrl::new(url::Url::parse(s)?)?; let tls_manager = connector::MakeTlsConnectorManager::new(url.clone()); - let psql = ::new(url, &tls_manager).await?; + let psql = connector::PostgreSqlWithDefaultCache::new(url, &tls_manager).await?; Arc::new(psql) as Arc } #[cfg(feature = "mssql-native")] diff --git a/schema-engine/connectors/sql-schema-connector/src/flavour/postgres/connection.rs b/schema-engine/connectors/sql-schema-connector/src/flavour/postgres/connection.rs index 3a8f9fb6517a..59325574d794 100644 --- a/schema-engine/connectors/sql-schema-connector/src/flavour/postgres/connection.rs +++ b/schema-engine/connectors/sql-schema-connector/src/flavour/postgres/connection.rs @@ -15,7 +15,7 @@ use crate::sql_renderer::IteratorJoin; use super::MigratePostgresUrl; -pub(super) struct Connection(connector::PostgreSql); +pub(super) struct Connection(connector::PostgreSqlWithNoCache); impl Connection { pub(super) async fn new(url: url::Url) -> ConnectorResult { @@ -24,7 +24,7 @@ impl Connection { let quaint = match url.0 { PostgresUrl::Native(ref native_url) => { let tls_manager = MakeTlsConnectorManager::new(native_url.as_ref().clone()); - connector::PostgreSql::new(native_url.as_ref().clone(), &tls_manager).await + connector::PostgreSqlWithNoCache::new(native_url.as_ref().clone(), &tls_manager).await } PostgresUrl::WebSocket(ref ws_url) => connector::PostgreSql::new_with_websocket(ws_url.clone()).await, } From e5308726e0ea354645fff8d388c50012e7ef05cf Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 11:21:11 +0000 Subject: [PATCH 06/23] chore: doc comments and cleanup --- quaint/src/connector/postgres/native/cache.rs | 12 ++++++++++++ quaint/src/connector/postgres/native/mod.rs | 11 +++++++++++ quaint/src/connector/postgres/native/query.rs | 9 +++++++-- quaint/src/pooled.rs | 8 +++++++- 4 files changed, 37 insertions(+), 3 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 6ea2ee52d12a..f7503d6b398d 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -13,13 +13,18 @@ use crate::connector::metrics::strip_query_traceparent; use super::query::{IsQuery, TypedQuery}; +/// Types that can be used as a cache for queries. #[async_trait] pub trait QueryCache: From + Send + Sync { + /// The type of the query that is returned by the cache. type Query: IsQuery; + /// Retrieves a query from the cache or prepares and caches it if it's not present. async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result; } +/// A no-op cache that creates a new prepared statement for each query. +/// Useful when we don't need caching. #[derive(Debug, Default)] pub struct NoopPreparedStatementCache; @@ -39,6 +44,7 @@ impl From for NoopPreparedStatementCache { } } +/// An LRU cache that creates and stores prepared statements. #[derive(Debug)] pub struct LruPreparedStatementCache { cache: InnerLruCache, @@ -74,6 +80,11 @@ impl From for LruPreparedStatementCache { } } +/// An LRU cache that creates and stores type information relevant to each query, keyed by queries +/// with tracing information removed. +/// +/// Returns [`TypedQuery`] instances, rather than [`Statement`], because prepared statements cannot +/// be re-used when the tracing information is attached to them. #[derive(Debug)] pub struct LruTracingCache { cache: InnerLruCache>, @@ -117,6 +128,7 @@ impl From for LruTracingCache { } } +/// Settings related to query caching. #[derive(Debug)] pub struct CacheSettings { pub capacity: usize, diff --git a/quaint/src/connector/postgres/native/mod.rs b/quaint/src/connector/postgres/native/mod.rs index 8f0ae1ca2a0e..6bbfba077930 100644 --- a/quaint/src/connector/postgres/native/mod.rs +++ b/quaint/src/connector/postgres/native/mod.rs @@ -66,6 +66,10 @@ const DB_SYSTEM_NAME_POSTGRESQL: &str = "postgresql"; const DB_SYSTEM_NAME_COCKROACHDB: &str = "cockroachdb"; /// A connector interface for the PostgreSQL database. +/// +/// # Type parameters +/// - `QueriesCache`: The cache used for queries that do not necessitate prepared statements. +/// - `StmtsCache`: The cache used for prepared statements. #[derive(Debug)] pub struct PostgreSql { client: PostgresClient, @@ -79,8 +83,15 @@ pub struct PostgreSql { db_system_name: &'static str, } +/// A Postgres client with the default caching strategy, which involves storing everything as +/// prepared statements in an LRU cache. pub type PostgreSqlWithDefaultCache = PostgreSql; + +/// A Postgres client which executes all queries as prepared statements without caching. pub type PostgreSqlWithNoCache = PostgreSql; + +/// A Postgres client with a caching strategy dedicated to query tracing, which involves storing +/// query type information in a dedicated LRU cache and not re-using any prepared statements. pub type PostgreSqlWithTracingCache = PostgreSql; #[derive(Debug)] diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index 0ef507da4427..8907ec4c5efe 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -4,6 +4,8 @@ use async_trait::async_trait; use postgres_types::{BorrowToSql, Type}; use tokio_postgres::{Client, Error, RowStream, Statement}; +/// Types that can be dispatched to the database as a query and carry the necessary type +/// information about its parameters and columns. #[async_trait] pub trait IsQuery: Send { fn param_types(&self) -> impl ExactSizeIterator + '_; @@ -31,7 +33,6 @@ impl IsQuery for Statement { self.columns().iter().map(|c| c.type_()) } - #[inline] async fn dispatch(&self, client: &Client, args: Args) -> Result where Args: IntoIterator + Send, @@ -42,6 +43,7 @@ impl IsQuery for Statement { } } +/// A query combined with the type information necessary to run and interpret it. #[derive(Debug)] pub struct TypedQuery { pub(super) sql: String, @@ -64,7 +66,6 @@ impl IsQuery for TypedQuery { self.column_types.iter() } - #[inline] async fn dispatch(&self, client: &Client, args: Args) -> Result where Args: IntoIterator + Send, @@ -79,18 +80,22 @@ impl IsQuery for TypedQuery { #[async_trait] impl IsQuery for Arc { + #[inline] fn param_types(&self) -> impl ExactSizeIterator + '_ { self.as_ref().param_types() } + #[inline] fn column_names(&self) -> impl ExactSizeIterator + '_ { self.as_ref().column_names() } + #[inline] fn column_types(&self) -> impl ExactSizeIterator + '_ { self.as_ref().column_types() } + #[inline] async fn dispatch(&self, client: &Client, args: Args) -> Result where Args: IntoIterator + Send, diff --git a/quaint/src/pooled.rs b/quaint/src/pooled.rs index 45fb4bb06338..1d272e61e5c8 100644 --- a/quaint/src/pooled.rs +++ b/quaint/src/pooled.rs @@ -355,7 +355,13 @@ impl Builder { } impl Quaint { - pub fn builder_with_tracing(url_str: &str, is_tracing_enabled: bool) -> crate::Result { + /// Creates a new builder for a Quaint connection pool with the given + /// connection string and tracing flag. + /// See the [module level documentation] for details. + pub fn builder_with_tracing( + url_str: &str, + #[allow(unused_variables)] is_tracing_enabled: bool, + ) -> crate::Result { match url_str { #[cfg(feature = "sqlite")] s if s.starts_with("file") => { From d40b7ed672ae8ea1a81e740aa19c7844d2c33cc4 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 11:27:47 +0000 Subject: [PATCH 07/23] chore: doc comment adjustments and renames --- quaint/src/connector/postgres/native/cache.rs | 12 ++++++------ quaint/src/connector/postgres/native/mod.rs | 2 +- quaint/src/connector/postgres/native/query.rs | 12 ++++++------ quaint/src/pooled.rs | 2 +- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index f7503d6b398d..c6a220798026 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -11,13 +11,13 @@ use tokio_postgres::{Client, Error, Statement}; use crate::connector::metrics::strip_query_traceparent; -use super::query::{IsQuery, TypedQuery}; +use super::query::{PreparedQuery, TypedQuery}; -/// Types that can be used as a cache for queries. +/// Types that can be used as a cache for prepared queries. #[async_trait] pub trait QueryCache: From + Send + Sync { - /// The type of the query that is returned by the cache. - type Query: IsQuery; + /// The type of query that is returned by the cache. + type Query: PreparedQuery; /// Retrieves a query from the cache or prepares and caches it if it's not present. async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result; @@ -80,8 +80,8 @@ impl From for LruPreparedStatementCache { } } -/// An LRU cache that creates and stores type information relevant to each query, keyed by queries -/// with tracing information removed. +/// An LRU cache that creates and stores type information relevant to each query, with keys being +/// stripped of any tracing information. /// /// Returns [`TypedQuery`] instances, rather than [`Statement`], because prepared statements cannot /// be re-used when the tracing information is attached to them. diff --git a/quaint/src/connector/postgres/native/mod.rs b/quaint/src/connector/postgres/native/mod.rs index 6bbfba077930..9aff42fcdc5e 100644 --- a/quaint/src/connector/postgres/native/mod.rs +++ b/quaint/src/connector/postgres/native/mod.rs @@ -34,7 +34,7 @@ use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use postgres_types::{Kind as PostgresKind, Type as PostgresType}; use prisma_metrics::WithMetricsInstrumentation; -use query::IsQuery; +use query::PreparedQuery; use std::{ fmt::{Debug, Display}, fs, diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index 8907ec4c5efe..1f0afcf8a86b 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -5,9 +5,9 @@ use postgres_types::{BorrowToSql, Type}; use tokio_postgres::{Client, Error, RowStream, Statement}; /// Types that can be dispatched to the database as a query and carry the necessary type -/// information about its parameters and columns. +/// information about its parameters and columns to interpret the results. #[async_trait] -pub trait IsQuery: Send { +pub trait PreparedQuery: Send { fn param_types(&self) -> impl ExactSizeIterator + '_; fn column_names(&self) -> impl ExactSizeIterator + '_; fn column_types(&self) -> impl ExactSizeIterator + '_; @@ -20,7 +20,7 @@ pub trait IsQuery: Send { } #[async_trait] -impl IsQuery for Statement { +impl PreparedQuery for Statement { fn param_types(&self) -> impl ExactSizeIterator + '_ { self.params().iter() } @@ -43,7 +43,7 @@ impl IsQuery for Statement { } } -/// A query combined with the type information necessary to run and interpret it. +/// A query combined with the relevant type information about its parameters and columns. #[derive(Debug)] pub struct TypedQuery { pub(super) sql: String, @@ -53,7 +53,7 @@ pub struct TypedQuery { } #[async_trait] -impl IsQuery for TypedQuery { +impl PreparedQuery for TypedQuery { fn param_types(&self) -> impl ExactSizeIterator + '_ { self.param_types.iter() } @@ -79,7 +79,7 @@ impl IsQuery for TypedQuery { } #[async_trait] -impl IsQuery for Arc { +impl PreparedQuery for Arc { #[inline] fn param_types(&self) -> impl ExactSizeIterator + '_ { self.as_ref().param_types() diff --git a/quaint/src/pooled.rs b/quaint/src/pooled.rs index 1d272e61e5c8..3da2c6998371 100644 --- a/quaint/src/pooled.rs +++ b/quaint/src/pooled.rs @@ -356,7 +356,7 @@ impl Builder { impl Quaint { /// Creates a new builder for a Quaint connection pool with the given - /// connection string and tracing flag. + /// connection string and a tracing flag. /// See the [module level documentation] for details. pub fn builder_with_tracing( url_str: &str, From 3e646974b6f914b406740dcca9abef79aeb2d39d Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 12:59:42 +0000 Subject: [PATCH 08/23] test: cover the queries and caches --- quaint/src/connector/postgres/native/cache.rs | 128 ++++++++++++++++-- quaint/src/connector/postgres/native/mod.rs | 12 +- quaint/src/connector/postgres/native/query.rs | 120 +++++++++++++++- 3 files changed, 239 insertions(+), 21 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index c6a220798026..27f430f8e14d 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -19,7 +19,7 @@ pub trait QueryCache: From + Send + Sync { /// The type of query that is returned by the cache. type Query: PreparedQuery; - /// Retrieves a query from the cache or prepares and caches it if it's not present. + /// Retrieve a prepared query from the cache or prepare and cache one if it's not present. async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result; } @@ -109,12 +109,7 @@ impl QueryCache for LruTracingCache { Some(query) => Ok(query), None => { let stmt = client.prepare_typed(sql, types).await?; - let query = Arc::new(TypedQuery { - sql: sql.into(), - param_types: stmt.params().to_vec(), - column_names: stmt.columns().iter().map(|c| c.name().to_owned()).collect(), - column_types: stmt.columns().iter().map(|c| c.type_().clone()).collect(), - }); + let query = Arc::new(TypedQuery::from_statement(sql, &stmt)); self.cache.insert(sql_without_traceparent, types, query.clone()).await; Ok(query) } @@ -136,7 +131,7 @@ pub struct CacheSettings { /// Key uniquely representing an SQL statement in the prepared statements cache. #[derive(Debug, PartialEq, Eq, Hash)] -pub struct QueryKey { +struct QueryKey { /// Hash of a string with SQL query. sql: u64, /// Combined hash of types for all parameters from the query. @@ -144,8 +139,7 @@ pub struct QueryKey { } impl QueryKey { - fn new(sql: &str, params: &[Type]) -> Self { - let st = RandomState::new(); + fn new(st: &S, sql: &str, params: &[Type]) -> Self { Self { sql: st.hash_one(sql), types_hash: st.hash_one(params), @@ -156,12 +150,14 @@ impl QueryKey { #[derive(Debug)] struct InnerLruCache { cache: Mutex>, + state: RandomState, } impl InnerLruCache { fn with_capacity(capacity: usize) -> Self { Self { cache: Mutex::new(LruCache::new(capacity)), + state: RandomState::new(), } } @@ -173,7 +169,7 @@ impl InnerLruCache { let capacity = cache.capacity(); let stored = cache.len(); - let key = QueryKey::new(sql, types); + let key = QueryKey::new(&self.state, sql, types); match cache.get_mut(&key) { Some(value) => { tracing::trace!( @@ -197,6 +193,114 @@ impl InnerLruCache { } pub async fn insert(&self, sql: &str, types: &[Type], value: V) { - self.cache.lock().await.insert(QueryKey::new(sql, types), value); + let key = QueryKey::new(&self.state, sql, types); + self.cache.lock().await.insert(key, value); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::future::Future; + + pub(crate) use crate::connector::postgres::url::PostgresNativeUrl; + use crate::{ + connector::{MakeTlsConnectorManager, PostgresFlavour}, + tests::test_api::postgres::CONN_STR, + }; + use url::Url; + + #[tokio::test] + async fn noop_prepared_statement_cache_prepares_new_statements_every_time() { + run_with_client(|client| async move { + let cache = NoopPreparedStatementCache; + let sql = "SELECT $1"; + let types = [Type::INT4]; + + let stmt1 = cache.get_by_query(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_by_query(&client, sql, &types).await.unwrap(); + assert_ne!(stmt1.name(), stmt2.name()); + }) + .await; + } + + #[tokio::test] + async fn lru_prepared_statement_cache_reuses_statements_within_capacity() { + run_with_client(|client| async move { + let cache = LruPreparedStatementCache::with_capacity(1); + let sql = "SELECT $1"; + let types = [Type::INT4]; + + let stmt1 = cache.get_by_query(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_by_query(&client, sql, &types).await.unwrap(); + assert_eq!(stmt1.name(), stmt2.name()); + + // replace our cached statement with a new one going over the capacity + cache.get_by_query(&client, sql, &[Type::INT8]).await.unwrap(); + + // the old statement should be evicted from the cache + let stmt3 = cache.get_by_query(&client, sql, &types).await.unwrap(); + assert_ne!(stmt1.name(), stmt3.name()); + }) + .await; + } + + #[tokio::test] + async fn tracing_cache_reuses_queries_within_capacity() { + run_with_client(|client| async move { + let cache = LruTracingCache::with_capacity(1); + let sql = "SELECT $1"; + let types = [Type::INT4]; + + let stmt1 = cache.get_by_query(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_by_query(&client, sql, &types).await.unwrap(); + assert!(Arc::ptr_eq(&stmt1, &stmt2), "stmt1 and stmt2 should be the same Arc"); + + // replace our cached query with a new one going over the capacity + cache.get_by_query(&client, sql, &[Type::INT8]).await.unwrap(); + + // the old query should be evicted from the cache + let stmt3 = cache.get_by_query(&client, sql, &types).await.unwrap(); + assert!( + !Arc::ptr_eq(&stmt1, &stmt3), + "stmt1 and stmt3 should not be the same Arc" + ); + }) + .await; + } + + #[tokio::test] + async fn tracing_cache_reuses_queries_with_different_traceparent() { + run_with_client(|client| async move { + let cache = LruTracingCache::with_capacity(1); + let sql1 = "SELECT $1 /* traceparent=00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01 */"; + let sql2 = "SELECT $1 /* traceparent=00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-02 */"; + let types = [Type::INT4]; + + let stmt1 = cache.get_by_query(&client, sql1, &types).await.unwrap(); + let stmt2 = cache.get_by_query(&client, sql2, &types).await.unwrap(); + assert!(Arc::ptr_eq(&stmt1, &stmt2), "stmt1 and stmt2 should be the same Arc"); + }) + .await; + } + + async fn run_with_client(test: Func) + where + Func: FnOnce(Client) -> Fut, + Fut: Future, + { + let url = Url::parse(&CONN_STR).unwrap(); + let mut pg_url = PostgresNativeUrl::new(url).unwrap(); + pg_url.set_flavour(PostgresFlavour::Postgres); + + let tls_manager = MakeTlsConnectorManager::new(pg_url.clone()); + let tls = tls_manager.get_connector().await.unwrap(); + + let (client, conn) = pg_url.to_config().connect(tls).await.unwrap(); + + let set = tokio::task::LocalSet::new(); + set.spawn_local(conn); + set.run_until(test(client)).await } } diff --git a/quaint/src/connector/postgres/native/mod.rs b/quaint/src/connector/postgres/native/mod.rs index 9aff42fcdc5e..423f0d2907cb 100644 --- a/quaint/src/connector/postgres/native/mod.rs +++ b/quaint/src/connector/postgres/native/mod.rs @@ -83,15 +83,17 @@ pub struct PostgreSql { db_system_name: &'static str, } -/// A Postgres client with the default caching strategy, which involves storing everything as -/// prepared statements in an LRU cache. +/// A [`PostgreSql`] interface with the default caching strategy, which involves storing all +/// queries as prepared statements in an LRU cache. pub type PostgreSqlWithDefaultCache = PostgreSql; -/// A Postgres client which executes all queries as prepared statements without caching. +/// A [`PostgreSql`] interface which executes all queries as prepared statements without caching +/// them. pub type PostgreSqlWithNoCache = PostgreSql; -/// A Postgres client with a caching strategy dedicated to query tracing, which involves storing -/// query type information in a dedicated LRU cache and not re-using any prepared statements. +/// A [`PostgreSql`] interface with the tracing caching strategy, which involves storing query +/// type information in a dedicated LRU cache for applicable queries and not re-using any prepared +/// statements. pub type PostgreSqlWithTracingCache = PostgreSql; #[derive(Debug)] diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index 1f0afcf8a86b..00a6682581e7 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -46,10 +46,22 @@ impl PreparedQuery for Statement { /// A query combined with the relevant type information about its parameters and columns. #[derive(Debug)] pub struct TypedQuery { - pub(super) sql: String, - pub(super) param_types: Vec, - pub(super) column_names: Vec, - pub(super) column_types: Vec, + sql: String, + param_types: Vec, + column_names: Vec, + column_types: Vec, +} + +impl TypedQuery { + /// Create a new typed query from a SQL string and a statement. + pub fn from_statement(sql: impl Into, statement: &Statement) -> Self { + Self { + sql: sql.into(), + param_types: statement.params().to_vec(), + column_names: statement.columns().iter().map(|c| c.name().to_owned()).collect(), + column_types: statement.columns().iter().map(|c| c.type_().clone()).collect(), + } + } } #[async_trait] @@ -105,3 +117,103 @@ impl PreparedQuery for Arc { self.as_ref().dispatch(client, args).await } } + +#[cfg(test)] +mod tests { + use super::*; + + use std::future::Future; + + pub(crate) use crate::connector::postgres::url::PostgresNativeUrl; + use crate::{ + connector::{MakeTlsConnectorManager, PostgresFlavour}, + tests::test_api::postgres::CONN_STR, + }; + use url::Url; + + #[tokio::test] + async fn typed_query_matches_statement_and_dispatches() { + run_with_client(|client| async move { + let query = "SELECT $1"; + let stmt = client.prepare_typed(query, &[Type::INT4]).await.unwrap(); + let typed = TypedQuery::from_statement(query, &stmt); + + assert_eq!(typed.param_types().cloned().collect::>(), stmt.params()); + assert_eq!( + typed.column_names().collect::>(), + stmt.columns().iter().map(|c| c.name()).collect::>() + ); + assert_eq!( + typed.column_types().collect::>(), + stmt.columns().iter().map(|c| c.type_()).collect::>() + ); + + let result = typed.dispatch(&client, &[&1i32]).await; + assert!(result.is_ok(), "{:?}", result.err()); + }) + .await; + } + + #[tokio::test] + async fn statement_trait_methods_match_statement_and_dispatch() { + run_with_client(|client| async move { + let query = "SELECT $1"; + let stmt = client.prepare_typed(query, &[Type::INT4]).await.unwrap(); + + assert_eq!(stmt.param_types().cloned().collect::>(), stmt.params()); + assert_eq!( + stmt.column_names().collect::>(), + stmt.columns().iter().map(|c| c.name()).collect::>() + ); + assert_eq!( + stmt.column_types().collect::>(), + stmt.columns().iter().map(|c| c.type_()).collect::>() + ); + + let result = stmt.dispatch(&client, &[&1i32]).await; + assert!(result.is_ok(), "{:?}", result.err()); + }) + .await; + } + + #[tokio::test] + async fn arc_trait_methods_match_statement_and_dispatch() { + run_with_client(|client| async move { + let query = "SELECT $1"; + let stmt = Arc::new(client.prepare_typed(query, &[Type::INT4]).await.unwrap()); + + assert_eq!(stmt.param_types().cloned().collect::>(), stmt.params()); + assert_eq!( + stmt.column_names().collect::>(), + stmt.columns().iter().map(|c| c.name()).collect::>() + ); + assert_eq!( + stmt.column_types().collect::>(), + stmt.columns().iter().map(|c| c.type_()).collect::>() + ); + + let result = stmt.dispatch(&client, &[&1i32]).await; + assert!(result.is_ok(), "{:?}", result.err()); + }) + .await; + } + + async fn run_with_client(test: Func) + where + Func: FnOnce(Client) -> Fut, + Fut: Future, + { + let url = Url::parse(&CONN_STR).unwrap(); + let mut pg_url = PostgresNativeUrl::new(url).unwrap(); + pg_url.set_flavour(PostgresFlavour::Postgres); + + let tls_manager = MakeTlsConnectorManager::new(pg_url.clone()); + let tls = tls_manager.get_connector().await.unwrap(); + + let (client, conn) = pg_url.to_config().connect(tls).await.unwrap(); + + let set = tokio::task::LocalSet::new(); + set.spawn_local(conn); + set.run_until(test(client)).await + } +} From f46a0cbc02248108e18b721240582036257b3b37 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 13:08:55 +0000 Subject: [PATCH 09/23] fix: actually pass tracing flag --- .../sql-query-connector/src/database/native/postgresql.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/query-engine/connectors/sql-query-connector/src/database/native/postgresql.rs b/query-engine/connectors/sql-query-connector/src/database/native/postgresql.rs index 9e59e4f232c7..171181297bfa 100644 --- a/query-engine/connectors/sql-query-connector/src/database/native/postgresql.rs +++ b/query-engine/connectors/sql-query-connector/src/database/native/postgresql.rs @@ -6,6 +6,7 @@ use connector_interface::{ Connection, Connector, }; use psl::builtin_connectors::COCKROACH; +use psl::PreviewFeature; use quaint::{connector::PostgresFlavour, pooled::Quaint, prelude::ConnectionInfo}; use std::time::Duration; @@ -40,7 +41,7 @@ impl FromSource for PostgreSql { }) })?; - let mut builder = Quaint::builder(url) + let mut builder = Quaint::builder_with_tracing(url, features.contains(PreviewFeature::Tracing)) .map_err(SqlError::from) .map_err(|sql_error| sql_error.into_connector_error(&err_conn_info))?; From 153c109d419c318ceddc58dbd0a85ff4be6a1751 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 17:33:39 +0000 Subject: [PATCH 10/23] refactor: merge the two prepared query caches --- quaint/src/connector/postgres/native/cache.rs | 161 +++++++++++++----- quaint/src/connector/postgres/native/mod.rs | 48 ++---- 2 files changed, 132 insertions(+), 77 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 27f430f8e14d..a714053c1bd2 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -13,44 +13,55 @@ use crate::connector::metrics::strip_query_traceparent; use super::query::{PreparedQuery, TypedQuery}; -/// Types that can be used as a cache for prepared queries. +/// Types that can be used as a cache for prepared queries and statements. #[async_trait] pub trait QueryCache: From + Send + Sync { - /// The type of query that is returned by the cache. + /// The type that is returned when a prepared query is requested from the cache. type Query: PreparedQuery; - /// Retrieve a prepared query from the cache or prepare and cache one if it's not present. - async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result; + /// Retrieve a prepared query. + async fn get_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result; + + /// Retrieve a prepared statement. + /// + /// This is useful in scenarios that require direct access to a prepared statement, + /// e.g. describing a query. + async fn get_statement(&self, client: &Client, sql: &str, types: &[Type]) -> Result; } -/// A no-op cache that creates a new prepared statement for each query. +/// A no-op cache that creates a new prepared statement for every requested query. /// Useful when we don't need caching. #[derive(Debug, Default)] -pub struct NoopPreparedStatementCache; +pub struct NoOpCache; #[async_trait] -impl QueryCache for NoopPreparedStatementCache { +impl QueryCache for NoOpCache { type Query = Statement; #[inline] - async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + async fn get_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + self.get_statement(client, sql, types).await + } + + #[inline] + async fn get_statement(&self, client: &Client, sql: &str, types: &[Type]) -> Result { client.prepare_typed(sql, types).await } } -impl From for NoopPreparedStatementCache { +impl From for NoOpCache { fn from(_: CacheSettings) -> Self { Self } } -/// An LRU cache that creates and stores prepared statements. +/// An LRU cache that creates a prepared statement for every newly requested query. #[derive(Debug)] -pub struct LruPreparedStatementCache { +pub struct PreparedStatementLruCache { cache: InnerLruCache, } -impl LruPreparedStatementCache { +impl PreparedStatementLruCache { pub fn with_capacity(capacity: usize) -> Self { Self { cache: InnerLruCache::with_capacity(capacity), @@ -59,10 +70,15 @@ impl LruPreparedStatementCache { } #[async_trait] -impl QueryCache for LruPreparedStatementCache { +impl QueryCache for PreparedStatementLruCache { type Query = Statement; - async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + #[inline] + async fn get_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + self.get_statement(client, sql, types).await + } + + async fn get_statement(&self, client: &Client, sql: &str, types: &[Type]) -> Result { match self.cache.get(sql, types).await { Some(statement) => Ok(statement), None => { @@ -74,23 +90,24 @@ impl QueryCache for LruPreparedStatementCache { } } -impl From for LruPreparedStatementCache { +impl From for PreparedStatementLruCache { fn from(settings: CacheSettings) -> Self { Self::with_capacity(settings.capacity) } } -/// An LRU cache that creates and stores type information relevant to each query, with keys being -/// stripped of any tracing information. -/// -/// Returns [`TypedQuery`] instances, rather than [`Statement`], because prepared statements cannot -/// be re-used when the tracing information is attached to them. +/// An LRU cache that creates and stores type information relevant to queries as instances of +/// [`TypedQuery`]. Queries are identified by their content with tracing information removed +/// (which makes it possible to cache them at all). The caching behavior is implemented in +/// [`get_query`](Self::get_query), while statements returned by +/// [`get_statement`](Self::get_statement) are always freshly prepared, because statements cannot +/// be re-used when tracing information is present. #[derive(Debug)] -pub struct LruTracingCache { +pub struct TracingLruCache { cache: InnerLruCache>, } -impl LruTracingCache { +impl TracingLruCache { pub fn with_capacity(capacity: usize) -> Self { Self { cache: InnerLruCache::with_capacity(capacity), @@ -99,10 +116,10 @@ impl LruTracingCache { } #[async_trait] -impl QueryCache for LruTracingCache { +impl QueryCache for TracingLruCache { type Query = Arc; - async fn get_by_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result, Error> { + async fn get_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result, Error> { let sql_without_traceparent = strip_query_traceparent(sql); match self.cache.get(sql_without_traceparent, types).await { @@ -115,9 +132,13 @@ impl QueryCache for LruTracingCache { } } } + + async fn get_statement(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + client.prepare_typed(sql, types).await + } } -impl From for LruTracingCache { +impl From for TracingLruCache { fn from(settings: CacheSettings) -> Self { Self::with_capacity(settings.capacity) } @@ -170,6 +191,7 @@ impl InnerLruCache { let stored = cache.len(); let key = QueryKey::new(&self.state, sql, types); + // we call `get_mut` because LRU requires mutable access for lookups match cache.get_mut(&key) { Some(value) => { tracing::trace!( @@ -212,56 +234,91 @@ mod tests { use url::Url; #[tokio::test] - async fn noop_prepared_statement_cache_prepares_new_statements_every_time() { + async fn noop_cache_returns_new_queries_every_time() { run_with_client(|client| async move { - let cache = NoopPreparedStatementCache; + let cache = NoOpCache; let sql = "SELECT $1"; let types = [Type::INT4]; - let stmt1 = cache.get_by_query(&client, sql, &types).await.unwrap(); - let stmt2 = cache.get_by_query(&client, sql, &types).await.unwrap(); + let stmt1 = cache.get_query(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_query(&client, sql, &types).await.unwrap(); assert_ne!(stmt1.name(), stmt2.name()); }) .await; } #[tokio::test] - async fn lru_prepared_statement_cache_reuses_statements_within_capacity() { + async fn noop_cache_returns_new_statements_every_time() { run_with_client(|client| async move { - let cache = LruPreparedStatementCache::with_capacity(1); + let cache = NoOpCache; let sql = "SELECT $1"; let types = [Type::INT4]; - let stmt1 = cache.get_by_query(&client, sql, &types).await.unwrap(); - let stmt2 = cache.get_by_query(&client, sql, &types).await.unwrap(); + let stmt1 = cache.get_statement(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_statement(&client, sql, &types).await.unwrap(); + assert_ne!(stmt1.name(), stmt2.name()); + }) + .await; + } + + #[tokio::test] + async fn prepared_statement_lru_cache_reuses_queries_within_capacity() { + run_with_client(|client| async move { + let cache = PreparedStatementLruCache::with_capacity(1); + let sql = "SELECT $1"; + let types = [Type::INT4]; + + let stmt1 = cache.get_query(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_query(&client, sql, &types).await.unwrap(); assert_eq!(stmt1.name(), stmt2.name()); // replace our cached statement with a new one going over the capacity - cache.get_by_query(&client, sql, &[Type::INT8]).await.unwrap(); + cache.get_query(&client, sql, &[Type::INT8]).await.unwrap(); // the old statement should be evicted from the cache - let stmt3 = cache.get_by_query(&client, sql, &types).await.unwrap(); + let stmt3 = cache.get_query(&client, sql, &types).await.unwrap(); assert_ne!(stmt1.name(), stmt3.name()); }) .await; } #[tokio::test] - async fn tracing_cache_reuses_queries_within_capacity() { + async fn prepared_statement_lru_cache_reuses_statements_within_capacity() { run_with_client(|client| async move { - let cache = LruTracingCache::with_capacity(1); + let cache = PreparedStatementLruCache::with_capacity(1); let sql = "SELECT $1"; let types = [Type::INT4]; - let stmt1 = cache.get_by_query(&client, sql, &types).await.unwrap(); - let stmt2 = cache.get_by_query(&client, sql, &types).await.unwrap(); + let stmt1 = cache.get_statement(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_statement(&client, sql, &types).await.unwrap(); + assert_eq!(stmt1.name(), stmt2.name()); + + // replace our cached statement with a new one going over the capacity + cache.get_statement(&client, sql, &[Type::INT8]).await.unwrap(); + + // the old statement should be evicted from the cache + let stmt3 = cache.get_statement(&client, sql, &types).await.unwrap(); + assert_ne!(stmt1.name(), stmt3.name()); + }) + .await; + } + + #[tokio::test] + async fn tracing_lru_cache_reuses_queries_within_capacity() { + run_with_client(|client| async move { + let cache = TracingLruCache::with_capacity(1); + let sql = "SELECT $1"; + let types = [Type::INT4]; + + let stmt1 = cache.get_query(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_query(&client, sql, &types).await.unwrap(); assert!(Arc::ptr_eq(&stmt1, &stmt2), "stmt1 and stmt2 should be the same Arc"); // replace our cached query with a new one going over the capacity - cache.get_by_query(&client, sql, &[Type::INT8]).await.unwrap(); + cache.get_query(&client, sql, &[Type::INT8]).await.unwrap(); // the old query should be evicted from the cache - let stmt3 = cache.get_by_query(&client, sql, &types).await.unwrap(); + let stmt3 = cache.get_query(&client, sql, &types).await.unwrap(); assert!( !Arc::ptr_eq(&stmt1, &stmt3), "stmt1 and stmt3 should not be the same Arc" @@ -271,20 +328,34 @@ mod tests { } #[tokio::test] - async fn tracing_cache_reuses_queries_with_different_traceparent() { + async fn tracing_lru_cache_reuses_queries_with_different_traceparent() { run_with_client(|client| async move { - let cache = LruTracingCache::with_capacity(1); + let cache = TracingLruCache::with_capacity(1); let sql1 = "SELECT $1 /* traceparent=00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01 */"; let sql2 = "SELECT $1 /* traceparent=00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-02 */"; let types = [Type::INT4]; - let stmt1 = cache.get_by_query(&client, sql1, &types).await.unwrap(); - let stmt2 = cache.get_by_query(&client, sql2, &types).await.unwrap(); + let stmt1 = cache.get_query(&client, sql1, &types).await.unwrap(); + let stmt2 = cache.get_query(&client, sql2, &types).await.unwrap(); assert!(Arc::ptr_eq(&stmt1, &stmt2), "stmt1 and stmt2 should be the same Arc"); }) .await; } + #[tokio::test] + async fn tracing_lru_cache_returns_new_statements_every_time() { + run_with_client(|client| async move { + let cache = TracingLruCache::with_capacity(1); + let sql = "SELECT $1"; + let types = [Type::INT4]; + + let stmt1 = cache.get_statement(&client, sql, &types).await.unwrap(); + let stmt2 = cache.get_statement(&client, sql, &types).await.unwrap(); + assert_ne!(stmt1.name(), stmt2.name()); + }) + .await; + } + async fn run_with_client(test: Func) where Func: FnOnce(Client) -> Fut, diff --git a/quaint/src/connector/postgres/native/mod.rs b/quaint/src/connector/postgres/native/mod.rs index 423f0d2907cb..2e8f62b424d3 100644 --- a/quaint/src/connector/postgres/native/mod.rs +++ b/quaint/src/connector/postgres/native/mod.rs @@ -26,7 +26,7 @@ use crate::{ visitor::{self, Visitor}, }; use async_trait::async_trait; -use cache::{CacheSettings, LruPreparedStatementCache, LruTracingCache, NoopPreparedStatementCache, QueryCache}; +use cache::{CacheSettings, NoOpCache, PreparedStatementLruCache, QueryCache, TracingLruCache}; use column_type::PGColumnType; use futures::future::FutureExt; use futures::StreamExt; @@ -68,15 +68,13 @@ const DB_SYSTEM_NAME_COCKROACHDB: &str = "cockroachdb"; /// A connector interface for the PostgreSQL database. /// /// # Type parameters -/// - `QueriesCache`: The cache used for queries that do not necessitate prepared statements. -/// - `StmtsCache`: The cache used for prepared statements. +/// - `Cache`: The cache used for prepared queries. #[derive(Debug)] -pub struct PostgreSql { +pub struct PostgreSql { client: PostgresClient, pg_bouncer: bool, socket_timeout: Option, - queries_cache: QueriesCache, - stmts_cache: StmtsCache, + cache: Cache, is_healthy: AtomicBool, is_cockroachdb: bool, is_materialize: bool, @@ -85,16 +83,16 @@ pub struct PostgreSql { /// A [`PostgreSql`] interface with the default caching strategy, which involves storing all /// queries as prepared statements in an LRU cache. -pub type PostgreSqlWithDefaultCache = PostgreSql; +pub type PostgreSqlWithDefaultCache = PostgreSql; /// A [`PostgreSql`] interface which executes all queries as prepared statements without caching /// them. -pub type PostgreSqlWithNoCache = PostgreSql; +pub type PostgreSqlWithNoCache = PostgreSql; /// A [`PostgreSql`] interface with the tracing caching strategy, which involves storing query /// type information in a dedicated LRU cache for applicable queries and not re-using any prepared /// statements. -pub type PostgreSqlWithTracingCache = PostgreSql; +pub type PostgreSqlWithTracingCache = PostgreSql; #[derive(Debug)] struct SslAuth { @@ -240,8 +238,7 @@ impl PostgreSqlWithNoCache { client: PostgresClient(client), socket_timeout: None, pg_bouncer: false, - queries_cache: NoopPreparedStatementCache, - stmts_cache: NoopPreparedStatementCache, + cache: NoOpCache, is_healthy: AtomicBool::new(true), is_cockroachdb: false, is_materialize: false, @@ -250,11 +247,7 @@ impl PostgreSqlWithNoCache { } } -impl PostgreSql -where - QueriesCache: QueryCache, - StmtsCache: QueryCache, -{ +impl PostgreSql { /// Create a new connection to the database. pub async fn new(url: PostgresNativeUrl, tls_manager: &MakeTlsConnectorManager) -> crate::Result { let config = url.to_config(); @@ -305,8 +298,7 @@ where client: PostgresClient(client), socket_timeout: url.query_params.socket_timeout, pg_bouncer: url.query_params.pg_bouncer, - queries_cache: url.cache_settings().into(), - stmts_cache: url.cache_settings().into(), + cache: url.cache_settings().into(), is_healthy: AtomicBool::new(true), is_cockroachdb, is_materialize, @@ -470,7 +462,7 @@ where sql, params, move || async move { - let query = self.queries_cache.get_by_query(&self.client.0, sql, types).await?; + let query = self.cache.get_query(&self.client.0, sql, types).await?; if query.param_types().len() != params.len() { let kind = ErrorKind::IncorrectNumberOfParameters { @@ -542,11 +534,7 @@ impl Display for SetSearchPath<'_> { } #[async_trait] -impl TransactionCapable for PostgreSql -where - QueriesCache: QueryCache, - StmtsCache: QueryCache, -{ +impl TransactionCapable for PostgreSql { async fn start_transaction<'a>( &'a self, isolation: Option, @@ -560,11 +548,7 @@ where } #[async_trait] -impl Queryable for PostgreSql -where - QueriesCache: QueryCache, - StmtsCache: QueryCache, -{ +impl Queryable for PostgreSql { async fn query(&self, q: Query<'_>) -> crate::Result { let (sql, params) = visitor::Postgres::build(q)?; @@ -581,7 +565,7 @@ where } async fn describe_query(&self, sql: &str) -> crate::Result { - let stmt = self.stmts_cache.get_by_query(&self.client.0, sql, &[]).await?; + let stmt = self.cache.get_statement(&self.client.0, sql, &[]).await?; let mut columns: Vec = Vec::with_capacity(stmt.columns().len()); let mut parameters: Vec = Vec::with_capacity(stmt.params().len()); @@ -670,7 +654,7 @@ where sql, params, move || async move { - let stmt = self.stmts_cache.get_by_query(&self.client.0, sql, &[]).await?; + let stmt = self.cache.get_statement(&self.client.0, sql, &[]).await?; if stmt.params().len() != params.len() { let kind = ErrorKind::IncorrectNumberOfParameters { @@ -701,7 +685,7 @@ where params, move || async move { let types = conversion::params_to_types(params); - let stmt = self.stmts_cache.get_by_query(&self.client.0, sql, &types).await?; + let stmt = self.cache.get_statement(&self.client.0, sql, &types).await?; if stmt.params().len() != params.len() { let kind = ErrorKind::IncorrectNumberOfParameters { From 17c1994fca846aa19c38c483d85c93c31e189554 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 18:06:14 +0000 Subject: [PATCH 11/23] fix: make sure to return the caller-provided sql with TypedQuery --- quaint/src/connector/postgres/native/cache.rs | 78 ++++++++------ quaint/src/connector/postgres/native/query.rs | 102 +++++++----------- 2 files changed, 81 insertions(+), 99 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index a714053c1bd2..a261d93d9f4b 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -11,16 +11,16 @@ use tokio_postgres::{Client, Error, Statement}; use crate::connector::metrics::strip_query_traceparent; -use super::query::{PreparedQuery, TypedQuery}; +use super::query::{PreparedQuery, QueryMetadata, TypedQuery}; /// Types that can be used as a cache for prepared queries and statements. #[async_trait] pub trait QueryCache: From + Send + Sync { /// The type that is returned when a prepared query is requested from the cache. - type Query: PreparedQuery; + type Query<'a>: PreparedQuery; /// Retrieve a prepared query. - async fn get_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result; + async fn get_query<'a>(&self, client: &Client, sql: &'a str, types: &[Type]) -> Result, Error>; /// Retrieve a prepared statement. /// @@ -36,10 +36,10 @@ pub struct NoOpCache; #[async_trait] impl QueryCache for NoOpCache { - type Query = Statement; + type Query<'a> = Statement; #[inline] - async fn get_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + async fn get_query<'a>(&self, client: &Client, sql: &'a str, types: &[Type]) -> Result { self.get_statement(client, sql, types).await } @@ -55,7 +55,7 @@ impl From for NoOpCache { } } -/// An LRU cache that creates a prepared statement for every newly requested query. +/// An LRU cache that creates a prepared statement for every query that is not in the cache. #[derive(Debug)] pub struct PreparedStatementLruCache { cache: InnerLruCache, @@ -71,10 +71,10 @@ impl PreparedStatementLruCache { #[async_trait] impl QueryCache for PreparedStatementLruCache { - type Query = Statement; + type Query<'a> = Statement; #[inline] - async fn get_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result { + async fn get_query<'a>(&self, client: &Client, sql: &'a str, types: &[Type]) -> Result { self.get_statement(client, sql, types).await } @@ -96,15 +96,15 @@ impl From for PreparedStatementLruCache { } } -/// An LRU cache that creates and stores type information relevant to queries as instances of -/// [`TypedQuery`]. Queries are identified by their content with tracing information removed -/// (which makes it possible to cache them at all). The caching behavior is implemented in -/// [`get_query`](Self::get_query), while statements returned by +/// An LRU cache that creates and stores query type information rather than prepared statements. +/// Queries are identified by their content with tracing information removed (which makes it +/// possible to cache them at all) and returned as instances of [`TypedQuery`]. The caching +/// behavior is implemented in [`get_query`](Self::get_query), while statements returned from /// [`get_statement`](Self::get_statement) are always freshly prepared, because statements cannot /// be re-used when tracing information is present. #[derive(Debug)] pub struct TracingLruCache { - cache: InnerLruCache>, + cache: InnerLruCache>, } impl TracingLruCache { @@ -117,20 +117,21 @@ impl TracingLruCache { #[async_trait] impl QueryCache for TracingLruCache { - type Query = Arc; + type Query<'a> = TypedQuery<'a>; - async fn get_query(&self, client: &Client, sql: &str, types: &[Type]) -> Result, Error> { + async fn get_query<'a>(&self, client: &Client, sql: &'a str, types: &[Type]) -> Result, Error> { let sql_without_traceparent = strip_query_traceparent(sql); - match self.cache.get(sql_without_traceparent, types).await { - Some(query) => Ok(query), + let metadata = match self.cache.get(sql_without_traceparent, types).await { + Some(metadata) => metadata, None => { - let stmt = client.prepare_typed(sql, types).await?; - let query = Arc::new(TypedQuery::from_statement(sql, &stmt)); - self.cache.insert(sql_without_traceparent, types, query.clone()).await; - Ok(query) + let stmt = client.prepare_typed(sql_without_traceparent, types).await?; + let metdata = Arc::new(QueryMetadata::from(&stmt)); + self.cache.insert(sql_without_traceparent, types, metdata.clone()).await; + metdata } - } + }; + Ok(TypedQuery::from_sql_and_metadata(sql, metadata)) } async fn get_statement(&self, client: &Client, sql: &str, types: &[Type]) -> Result { @@ -310,18 +311,21 @@ mod tests { let sql = "SELECT $1"; let types = [Type::INT4]; - let stmt1 = cache.get_query(&client, sql, &types).await.unwrap(); - let stmt2 = cache.get_query(&client, sql, &types).await.unwrap(); - assert!(Arc::ptr_eq(&stmt1, &stmt2), "stmt1 and stmt2 should be the same Arc"); + let q1 = cache.get_query(&client, sql, &types).await.unwrap(); + let q2 = cache.get_query(&client, sql, &types).await.unwrap(); + assert!( + std::ptr::eq(q1.metadata(), q2.metadata()), + "stmt1 and stmt2 should re-use the same metadata" + ); // replace our cached query with a new one going over the capacity cache.get_query(&client, sql, &[Type::INT8]).await.unwrap(); // the old query should be evicted from the cache - let stmt3 = cache.get_query(&client, sql, &types).await.unwrap(); + let q3 = cache.get_query(&client, sql, &types).await.unwrap(); assert!( - !Arc::ptr_eq(&stmt1, &stmt3), - "stmt1 and stmt3 should not be the same Arc" + !std::ptr::eq(q1.metadata(), q3.metadata()), + "stmt1 and stmt3 should not re-use the same metadata" ); }) .await; @@ -335,9 +339,15 @@ mod tests { let sql2 = "SELECT $1 /* traceparent=00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-02 */"; let types = [Type::INT4]; - let stmt1 = cache.get_query(&client, sql1, &types).await.unwrap(); - let stmt2 = cache.get_query(&client, sql2, &types).await.unwrap(); - assert!(Arc::ptr_eq(&stmt1, &stmt2), "stmt1 and stmt2 should be the same Arc"); + let q1 = cache.get_query(&client, sql1, &types).await.unwrap(); + assert_eq!(q1.query(), sql1); + let q2 = cache.get_query(&client, sql2, &types).await.unwrap(); + assert_eq!(q2.query(), sql2); + + assert!( + std::ptr::eq(q1.metadata(), q2.metadata()), + "stmt1 and stmt2 should re-use the same metadata" + ); }) .await; } @@ -349,9 +359,9 @@ mod tests { let sql = "SELECT $1"; let types = [Type::INT4]; - let stmt1 = cache.get_statement(&client, sql, &types).await.unwrap(); - let stmt2 = cache.get_statement(&client, sql, &types).await.unwrap(); - assert_ne!(stmt1.name(), stmt2.name()); + let q1 = cache.get_statement(&client, sql, &types).await.unwrap(); + let q2 = cache.get_statement(&client, sql, &types).await.unwrap(); + assert_ne!(q1.name(), q2.name()); }) .await; } diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index 00a6682581e7..bd697a45d88c 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -45,76 +45,70 @@ impl PreparedQuery for Statement { /// A query combined with the relevant type information about its parameters and columns. #[derive(Debug)] -pub struct TypedQuery { - sql: String, - param_types: Vec, - column_names: Vec, - column_types: Vec, +pub struct TypedQuery<'a> { + sql: &'a str, + metadata: Arc, } -impl TypedQuery { +impl<'a> TypedQuery<'a> { /// Create a new typed query from a SQL string and a statement. - pub fn from_statement(sql: impl Into, statement: &Statement) -> Self { + pub fn from_sql_and_metadata(sql: &'a str, metadata: impl Into>) -> Self { Self { - sql: sql.into(), - param_types: statement.params().to_vec(), - column_names: statement.columns().iter().map(|c| c.name().to_owned()).collect(), - column_types: statement.columns().iter().map(|c| c.type_().clone()).collect(), + sql, + metadata: metadata.into(), } } -} - -#[async_trait] -impl PreparedQuery for TypedQuery { - fn param_types(&self) -> impl ExactSizeIterator + '_ { - self.param_types.iter() - } - - fn column_names(&self) -> impl ExactSizeIterator + '_ { - self.column_names.iter().map(|s| s.as_str()) - } - fn column_types(&self) -> impl ExactSizeIterator + '_ { - self.column_types.iter() + /// Get the SQL string of the query. + pub fn query(&self) -> &'a str { + self.sql } - async fn dispatch(&self, client: &Client, args: Args) -> Result - where - Args: IntoIterator + Send, - Args::Item: BorrowToSql, - Args::IntoIter: ExactSizeIterator + Send, - { - client - .query_typed_raw(&self.sql, args.into_iter().zip(self.param_types.iter().cloned())) - .await + /// Get the metadata associated with the query. + pub fn metadata(&self) -> &QueryMetadata { + &self.metadata } } #[async_trait] -impl PreparedQuery for Arc { - #[inline] +impl<'a> PreparedQuery for TypedQuery<'a> { fn param_types(&self) -> impl ExactSizeIterator + '_ { - self.as_ref().param_types() + self.metadata.param_types.iter() } - #[inline] fn column_names(&self) -> impl ExactSizeIterator + '_ { - self.as_ref().column_names() + self.metadata.column_names.iter().map(|s| s.as_str()) } - #[inline] fn column_types(&self) -> impl ExactSizeIterator + '_ { - self.as_ref().column_types() + self.metadata.column_types.iter() } - #[inline] async fn dispatch(&self, client: &Client, args: Args) -> Result where Args: IntoIterator + Send, Args::Item: BorrowToSql, Args::IntoIter: ExactSizeIterator + Send, { - self.as_ref().dispatch(client, args).await + let typed_args = args.into_iter().zip(self.metadata.param_types.iter().cloned()); + client.query_typed_raw(self.sql, typed_args).await + } +} + +#[derive(Debug)] +pub struct QueryMetadata { + param_types: Vec, + column_names: Vec, + column_types: Vec, +} + +impl From<&Statement> for QueryMetadata { + fn from(statement: &Statement) -> Self { + Self { + param_types: statement.params().to_vec(), + column_names: statement.columns().iter().map(|c| c.name().to_owned()).collect(), + column_types: statement.columns().iter().map(|c| c.type_().clone()).collect(), + } } } @@ -136,7 +130,7 @@ mod tests { run_with_client(|client| async move { let query = "SELECT $1"; let stmt = client.prepare_typed(query, &[Type::INT4]).await.unwrap(); - let typed = TypedQuery::from_statement(query, &stmt); + let typed = TypedQuery::from_sql_and_metadata(query, QueryMetadata::from(&stmt)); assert_eq!(typed.param_types().cloned().collect::>(), stmt.params()); assert_eq!( @@ -176,28 +170,6 @@ mod tests { .await; } - #[tokio::test] - async fn arc_trait_methods_match_statement_and_dispatch() { - run_with_client(|client| async move { - let query = "SELECT $1"; - let stmt = Arc::new(client.prepare_typed(query, &[Type::INT4]).await.unwrap()); - - assert_eq!(stmt.param_types().cloned().collect::>(), stmt.params()); - assert_eq!( - stmt.column_names().collect::>(), - stmt.columns().iter().map(|c| c.name()).collect::>() - ); - assert_eq!( - stmt.column_types().collect::>(), - stmt.columns().iter().map(|c| c.type_()).collect::>() - ); - - let result = stmt.dispatch(&client, &[&1i32]).await; - assert!(result.is_ok(), "{:?}", result.err()); - }) - .await; - } - async fn run_with_client(test: Func) where Func: FnOnce(Client) -> Fut, From dd5f258a79a2438163e90c646e62d21385418629 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 18:13:03 +0000 Subject: [PATCH 12/23] chore: get rid of some unneeded accessors --- quaint/src/connector/postgres/native/cache.rs | 11 ++++++----- quaint/src/connector/postgres/native/query.rs | 14 ++------------ 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index a261d93d9f4b..d6d5a464006b 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -314,7 +314,7 @@ mod tests { let q1 = cache.get_query(&client, sql, &types).await.unwrap(); let q2 = cache.get_query(&client, sql, &types).await.unwrap(); assert!( - std::ptr::eq(q1.metadata(), q2.metadata()), + Arc::ptr_eq(&q1.metadata, &q2.metadata), "stmt1 and stmt2 should re-use the same metadata" ); @@ -324,7 +324,7 @@ mod tests { // the old query should be evicted from the cache let q3 = cache.get_query(&client, sql, &types).await.unwrap(); assert!( - !std::ptr::eq(q1.metadata(), q3.metadata()), + !Arc::ptr_eq(&q1.metadata, &q3.metadata), "stmt1 and stmt3 should not re-use the same metadata" ); }) @@ -340,12 +340,13 @@ mod tests { let types = [Type::INT4]; let q1 = cache.get_query(&client, sql1, &types).await.unwrap(); - assert_eq!(q1.query(), sql1); + assert_eq!(q1.sql, sql1); let q2 = cache.get_query(&client, sql2, &types).await.unwrap(); - assert_eq!(q2.query(), sql2); + // the requested query traceparent should be preserved + assert_eq!(q2.sql, sql2); assert!( - std::ptr::eq(q1.metadata(), q2.metadata()), + Arc::ptr_eq(&q1.metadata, &q2.metadata), "stmt1 and stmt2 should re-use the same metadata" ); }) diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index bd697a45d88c..4beb7dd1acfd 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -46,8 +46,8 @@ impl PreparedQuery for Statement { /// A query combined with the relevant type information about its parameters and columns. #[derive(Debug)] pub struct TypedQuery<'a> { - sql: &'a str, - metadata: Arc, + pub(super) sql: &'a str, + pub(super) metadata: Arc, } impl<'a> TypedQuery<'a> { @@ -58,16 +58,6 @@ impl<'a> TypedQuery<'a> { metadata: metadata.into(), } } - - /// Get the SQL string of the query. - pub fn query(&self) -> &'a str { - self.sql - } - - /// Get the metadata associated with the query. - pub fn metadata(&self) -> &QueryMetadata { - &self.metadata - } } #[async_trait] From 2d29f084a2daf8358fdb04cc313943a49bcea70e Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 18:17:18 +0000 Subject: [PATCH 13/23] chore: update assertion text --- quaint/src/connector/postgres/native/cache.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index d6d5a464006b..20f18cd8afef 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -315,7 +315,7 @@ mod tests { let q2 = cache.get_query(&client, sql, &types).await.unwrap(); assert!( Arc::ptr_eq(&q1.metadata, &q2.metadata), - "stmt1 and stmt2 should re-use the same metadata" + "q1 and q2 should re-use the same metadata" ); // replace our cached query with a new one going over the capacity @@ -325,7 +325,7 @@ mod tests { let q3 = cache.get_query(&client, sql, &types).await.unwrap(); assert!( !Arc::ptr_eq(&q1.metadata, &q3.metadata), - "stmt1 and stmt3 should not re-use the same metadata" + "q1 and q3 should not re-use the same metadata" ); }) .await; @@ -347,7 +347,7 @@ mod tests { assert!( Arc::ptr_eq(&q1.metadata, &q2.metadata), - "stmt1 and stmt2 should re-use the same metadata" + "q1 and q2 should re-use the same metadata" ); }) .await; From 496f527738ac3df09ca2b78e2b7b123e849886fe Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 18:20:41 +0000 Subject: [PATCH 14/23] doc: clarify comment --- quaint/src/connector/postgres/native/cache.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 20f18cd8afef..a78ae26022ea 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -98,10 +98,10 @@ impl From for PreparedStatementLruCache { /// An LRU cache that creates and stores query type information rather than prepared statements. /// Queries are identified by their content with tracing information removed (which makes it -/// possible to cache them at all) and returned as instances of [`TypedQuery`]. The caching -/// behavior is implemented in [`get_query`](Self::get_query), while statements returned from -/// [`get_statement`](Self::get_statement) are always freshly prepared, because statements cannot -/// be re-used when tracing information is present. +/// possible to cache traced queries at all) and returned as instances of [`TypedQuery`]. The +/// caching behavior is implemented in [`get_query`](Self::get_query), while statements returned +/// from [`get_statement`](Self::get_statement) are always freshly prepared, because statements +/// cannot be re-used when tracing information is present. #[derive(Debug)] pub struct TracingLruCache { cache: InnerLruCache>, From c6bac2d5533fcd2398d3eafaf5a5e5007d2c2c1e Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Wed, 11 Dec 2024 18:32:28 +0000 Subject: [PATCH 15/23] chore: use rsplit because we expect traceparent at the end --- quaint/src/connector/metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quaint/src/connector/metrics.rs b/quaint/src/connector/metrics.rs index 158027efe634..9f33de723978 100644 --- a/quaint/src/connector/metrics.rs +++ b/quaint/src/connector/metrics.rs @@ -111,7 +111,7 @@ impl fmt::Display for QueryForTracing<'_> { } pub(super) fn strip_query_traceparent(query: &str) -> &str { - query.split_once("/* traceparent=").map_or(query, |(str, remainder)| { + query.rsplit_once("/* traceparent=").map_or(query, |(str, remainder)| { if remainder .split_once("*/") .is_some_and(|(_, suffix)| suffix.trim_end().is_empty()) From bb7f292f32b16614c355908388712cc6cb053e9d Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Thu, 12 Dec 2024 11:17:58 +0000 Subject: [PATCH 16/23] doc: correct comment --- quaint/src/connector/postgres/native/query.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index 4beb7dd1acfd..d2d9b9b0a89f 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -51,7 +51,7 @@ pub struct TypedQuery<'a> { } impl<'a> TypedQuery<'a> { - /// Create a new typed query from a SQL string and a statement. + /// Create a new typed query from an SQL string and metadata. pub fn from_sql_and_metadata(sql: &'a str, metadata: impl Into>) -> Self { Self { sql, From 449593412860f2f69c8be2d8ebeca744cfa57d24 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Thu, 12 Dec 2024 11:24:37 +0000 Subject: [PATCH 17/23] test: cover higher capacity in the tests --- quaint/src/connector/postgres/native/cache.rs | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index a78ae26022ea..c68c6e960c18 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -265,7 +265,7 @@ mod tests { #[tokio::test] async fn prepared_statement_lru_cache_reuses_queries_within_capacity() { run_with_client(|client| async move { - let cache = PreparedStatementLruCache::with_capacity(1); + let cache = PreparedStatementLruCache::with_capacity(3); let sql = "SELECT $1"; let types = [Type::INT4]; @@ -273,10 +273,12 @@ mod tests { let stmt2 = cache.get_query(&client, sql, &types).await.unwrap(); assert_eq!(stmt1.name(), stmt2.name()); - // replace our cached statement with a new one going over the capacity - cache.get_query(&client, sql, &[Type::INT8]).await.unwrap(); + // fill the cache with different types, causing the first query to be evicted + for typ in [Type::INT8, Type::INT4_ARRAY, Type::INT8_ARRAY] { + cache.get_query(&client, sql, &[typ]).await.unwrap(); + } - // the old statement should be evicted from the cache + // the old statement should be re-created let stmt3 = cache.get_query(&client, sql, &types).await.unwrap(); assert_ne!(stmt1.name(), stmt3.name()); }) @@ -286,7 +288,7 @@ mod tests { #[tokio::test] async fn prepared_statement_lru_cache_reuses_statements_within_capacity() { run_with_client(|client| async move { - let cache = PreparedStatementLruCache::with_capacity(1); + let cache = PreparedStatementLruCache::with_capacity(3); let sql = "SELECT $1"; let types = [Type::INT4]; @@ -294,10 +296,12 @@ mod tests { let stmt2 = cache.get_statement(&client, sql, &types).await.unwrap(); assert_eq!(stmt1.name(), stmt2.name()); - // replace our cached statement with a new one going over the capacity - cache.get_statement(&client, sql, &[Type::INT8]).await.unwrap(); + // fill the cache with different types, causing the first query to be evicted + for typ in [Type::INT8, Type::INT4_ARRAY, Type::INT8_ARRAY] { + cache.get_query(&client, sql, &[typ]).await.unwrap(); + } - // the old statement should be evicted from the cache + // the old statement should be re-created let stmt3 = cache.get_statement(&client, sql, &types).await.unwrap(); assert_ne!(stmt1.name(), stmt3.name()); }) @@ -307,7 +311,7 @@ mod tests { #[tokio::test] async fn tracing_lru_cache_reuses_queries_within_capacity() { run_with_client(|client| async move { - let cache = TracingLruCache::with_capacity(1); + let cache = TracingLruCache::with_capacity(3); let sql = "SELECT $1"; let types = [Type::INT4]; @@ -318,10 +322,12 @@ mod tests { "q1 and q2 should re-use the same metadata" ); - // replace our cached query with a new one going over the capacity - cache.get_query(&client, sql, &[Type::INT8]).await.unwrap(); + // fill the cache with different types, causing the first query to be evicted + for typ in [Type::INT8, Type::INT4_ARRAY, Type::INT8_ARRAY] { + cache.get_query(&client, sql, &[typ]).await.unwrap(); + } - // the old query should be evicted from the cache + // the old query should be re-created let q3 = cache.get_query(&client, sql, &types).await.unwrap(); assert!( !Arc::ptr_eq(&q1.metadata, &q3.metadata), From 76d35cba3c082f38e0cd4bb80a5515197843aabc Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Fri, 13 Dec 2024 13:05:59 +0000 Subject: [PATCH 18/23] [integration] From 52e1920e6b09b117a26661d37708b4463aa453ee Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Fri, 13 Dec 2024 14:37:24 +0000 Subject: [PATCH 19/23] chore: correct typo --- quaint/src/connector/postgres/native/cache.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index c68c6e960c18..8f5a491d149a 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -126,9 +126,11 @@ impl QueryCache for TracingLruCache { Some(metadata) => metadata, None => { let stmt = client.prepare_typed(sql_without_traceparent, types).await?; - let metdata = Arc::new(QueryMetadata::from(&stmt)); - self.cache.insert(sql_without_traceparent, types, metdata.clone()).await; - metdata + let metadata = Arc::new(QueryMetadata::from(&stmt)); + self.cache + .insert(sql_without_traceparent, types, metadata.clone()) + .await; + metadata } }; Ok(TypedQuery::from_sql_and_metadata(sql, metadata)) From 5c333e4a5fe2409c7ba2b49a8fd9cb8930d4fd16 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Fri, 13 Dec 2024 15:10:37 +0000 Subject: [PATCH 20/23] fix: remove unneeded outlives trick --- quaint/src/connector/postgres/native/query.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/quaint/src/connector/postgres/native/query.rs b/quaint/src/connector/postgres/native/query.rs index d2d9b9b0a89f..3efbae04873d 100644 --- a/quaint/src/connector/postgres/native/query.rs +++ b/quaint/src/connector/postgres/native/query.rs @@ -8,9 +8,9 @@ use tokio_postgres::{Client, Error, RowStream, Statement}; /// information about its parameters and columns to interpret the results. #[async_trait] pub trait PreparedQuery: Send { - fn param_types(&self) -> impl ExactSizeIterator + '_; - fn column_names(&self) -> impl ExactSizeIterator + '_; - fn column_types(&self) -> impl ExactSizeIterator + '_; + fn param_types(&self) -> impl ExactSizeIterator; + fn column_names(&self) -> impl ExactSizeIterator; + fn column_types(&self) -> impl ExactSizeIterator; async fn dispatch(&self, client: &Client, args: Args) -> Result where @@ -21,15 +21,15 @@ pub trait PreparedQuery: Send { #[async_trait] impl PreparedQuery for Statement { - fn param_types(&self) -> impl ExactSizeIterator + '_ { + fn param_types(&self) -> impl ExactSizeIterator { self.params().iter() } - fn column_names(&self) -> impl ExactSizeIterator + '_ { + fn column_names(&self) -> impl ExactSizeIterator { self.columns().iter().map(|c| c.name()) } - fn column_types(&self) -> impl ExactSizeIterator + '_ { + fn column_types(&self) -> impl ExactSizeIterator { self.columns().iter().map(|c| c.type_()) } @@ -62,15 +62,15 @@ impl<'a> TypedQuery<'a> { #[async_trait] impl<'a> PreparedQuery for TypedQuery<'a> { - fn param_types(&self) -> impl ExactSizeIterator + '_ { + fn param_types(&self) -> impl ExactSizeIterator { self.metadata.param_types.iter() } - fn column_names(&self) -> impl ExactSizeIterator + '_ { + fn column_names(&self) -> impl ExactSizeIterator { self.metadata.column_names.iter().map(|s| s.as_str()) } - fn column_types(&self) -> impl ExactSizeIterator + '_ { + fn column_types(&self) -> impl ExactSizeIterator { self.metadata.column_types.iter() } From a5078e19e24939f626ade784efe65f7e84bc3bcc Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Fri, 13 Dec 2024 15:13:07 +0000 Subject: [PATCH 21/23] chore: less annoying trace message --- quaint/src/connector/postgres/native/cache.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 8f5a491d149a..6e80893f8564 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -198,7 +198,7 @@ impl InnerLruCache { match cache.get_mut(&key) { Some(value) => { tracing::trace!( - message = "CACHE HIT!", + message = "query cache hit", query = sql, capacity = capacity, stored = stored, @@ -207,7 +207,7 @@ impl InnerLruCache { } None => { tracing::trace!( - message = "CACHE MISS!", + message = "query cache miss", query = sql, capacity = capacity, stored = stored, From 2a336b354810f20ded8a359063db6b69906d503f Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Fri, 13 Dec 2024 15:14:37 +0000 Subject: [PATCH 22/23] chore: switch over to tokio mutex --- quaint/src/connector/postgres/native/cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 6e80893f8564..108b83efbd29 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -4,9 +4,9 @@ use std::{ }; use async_trait::async_trait; -use futures::lock::Mutex; use lru_cache::LruCache; use postgres_types::Type; +use tokio::sync::Mutex; use tokio_postgres::{Client, Error, Statement}; use crate::connector::metrics::strip_query_traceparent; From 52c5e61f1c9329561ad23b62b6a1ddf383656857 Mon Sep 17 00:00:00 2001 From: Jacek Malec Date: Fri, 13 Dec 2024 15:53:12 +0000 Subject: [PATCH 23/23] refactor: use a NoOp hasher --- quaint/src/connector/postgres/native/cache.rs | 58 ++++++++++++++----- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 108b83efbd29..447326608fa7 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -1,5 +1,5 @@ use std::{ - hash::{BuildHasher, Hash, RandomState}, + hash::{BuildHasher, Hash, Hasher, RandomState}, sync::Arc, }; @@ -155,32 +155,24 @@ pub struct CacheSettings { /// Key uniquely representing an SQL statement in the prepared statements cache. #[derive(Debug, PartialEq, Eq, Hash)] -struct QueryKey { - /// Hash of a string with SQL query. - sql: u64, - /// Combined hash of types for all parameters from the query. - types_hash: u64, -} +struct QueryKey(u64); impl QueryKey { fn new(st: &S, sql: &str, params: &[Type]) -> Self { - Self { - sql: st.hash_one(sql), - types_hash: st.hash_one(params), - } + Self(st.hash_one((sql, params))) } } #[derive(Debug)] struct InnerLruCache { - cache: Mutex>, + cache: Mutex>, state: RandomState, } impl InnerLruCache { fn with_capacity(capacity: usize) -> Self { Self { - cache: Mutex::new(LruCache::new(capacity)), + cache: Mutex::new(LruCache::with_hasher(capacity, NoOpHasherBuilder)), state: RandomState::new(), } } @@ -223,6 +215,34 @@ impl InnerLruCache { } } +struct NoOpHasherBuilder; + +impl BuildHasher for NoOpHasherBuilder { + type Hasher = NoOpHasher; + + fn build_hasher(&self) -> Self::Hasher { + NoOpHasher(None) + } +} + +/// A hasher that expects to be called with a single u64 and returns it as the hash. +struct NoOpHasher(Option); + +impl Hasher for NoOpHasher { + fn finish(&self) -> u64 { + self.0.expect("NoopHasher should have been called with a single u64") + } + + fn write(&mut self, _bytes: &[u8]) { + panic!("NoopHasher should only be called with u64") + } + + fn write_u64(&mut self, i: u64) { + assert!(self.0.is_none(), "NoopHasher should only be called once"); + self.0 = Some(i); + } +} + #[cfg(test)] mod tests { use super::*; @@ -375,6 +395,18 @@ mod tests { .await; } + #[test] + fn noop_hasher_returns_the_same_hash_the_input() { + assert_eq!(NoOpHasherBuilder.hash_one(0xdeadc0deu64), 0xdeadc0de); + assert_eq!(NoOpHasherBuilder.hash_one(0xcafeu64), 0xcafe); + } + + #[test] + #[should_panic(expected = "NoopHasher should only be called with u64")] + fn noop_hasher_doesnt_accept_non_u64_input() { + NoOpHasherBuilder.hash_one("hello"); + } + async fn run_with_client(test: Func) where Func: FnOnce(Client) -> Fut,