Skip to content

Commit

Permalink
Unify code in query_ functions (#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
aljazerzen authored Oct 4, 2024
1 parent ee2c03a commit a63984d
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 518 deletions.
311 changes: 87 additions & 224 deletions edgedb-tokio/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::future::Future;
use std::sync::Arc;

use bytes::BytesMut;
use edgedb_protocol::common::CompilationOptions;
use edgedb_protocol::common::{Capabilities, Cardinality, IoFormat};
use edgedb_protocol::model::Json;
use edgedb_protocol::query_arg::{Encoder, QueryArgs};
use edgedb_protocol::query_arg::QueryArgs;
use edgedb_protocol::QueryResult;
use tokio::time::sleep;

use crate::builder::Config;
use crate::errors::NoDataError;
use crate::errors::{Error, ErrorKind, SHOULD_RETRY};
use crate::errors::{NoDataError, NoResultExpected, ProtocolEncodingError};
use crate::options::{RetryOptions, TransactionOptions};
use crate::raw::{Options, PoolState};
use crate::raw::{Pool, QueryCapabilities};
Expand Down Expand Up @@ -58,22 +56,13 @@ impl Client {
Ok(())
}

/// Execute a query and return a collection of results.
///
/// You will usually have to specify the return type for the query:
///
/// ```rust,ignore
/// let greeting = pool.query::<String, _>("SELECT 'hello'", &());
/// // or
/// let greeting: Vec<String> = pool.query("SELECT 'hello'", &());
///
/// let two_numbers: Vec<i32> = conn.query("select {<int32>$0, <int32>$1}", &(10, 20)).await?;
/// ```
///
/// This method can be used with both static arguments, like a tuple of
/// scalars, and with dynamic arguments [`edgedb_protocol::value::Value`].
/// Similarly, dynamically typed results are also supported.
pub async fn query<R, A>(&self, query: impl AsRef<str>, arguments: &A) -> Result<Vec<R>, Error>
async fn query_and_retry<R, A>(
&self,
query: impl AsRef<str>,
arguments: &A,
io_format: IoFormat,
cardinality: Cardinality,
) -> Result<Vec<R>, Error>
where
A: QueryArgs,
R: QueryResult,
Expand All @@ -85,7 +74,17 @@ impl Client {
let conn = conn.inner();
let state = &self.options.state;
let caps = Capabilities::MODIFICATIONS | Capabilities::DDL;
match conn.query(query.as_ref(), arguments, state, caps).await {
match conn
.query(
query.as_ref(),
arguments,
state,
caps,
io_format,
cardinality,
)
.await
{
Ok(resp) => return Ok(resp.data),
Err(e) => {
let allow_retry = match e.get::<QueryCapabilities>() {
Expand All @@ -111,6 +110,33 @@ impl Client {
}
}

/// Execute a query and return a collection of results.
///
/// You will usually have to specify the return type for the query:
///
/// ```rust,ignore
/// let greeting = pool.query::<String, _>("SELECT 'hello'", &());
/// // or
/// let greeting: Vec<String> = pool.query("SELECT 'hello'", &());
///
/// let two_numbers: Vec<i32> = conn.query("select {<int32>$0, <int32>$1}", &(10, 20)).await?;
/// ```
///
/// This method can be used with both static arguments, like a tuple of
/// scalars, and with dynamic arguments [`edgedb_protocol::value::Value`].
/// Similarly, dynamically typed results are also supported.
pub async fn query<R, A>(
&self,
query: impl AsRef<str> + Send,
arguments: &A,
) -> Result<Vec<R>, Error>
where
A: QueryArgs,
R: QueryResult,
{
Client::query_and_retry(self, query, arguments, IoFormat::Binary, Cardinality::Many).await
}

/// Execute a query and return a single result
///
/// You will usually have to specify the return type for the query:
Expand All @@ -129,46 +155,22 @@ impl Client {
/// Similarly, dynamically typed results are also supported.
pub async fn query_single<R, A>(
&self,
query: impl AsRef<str>,
query: impl AsRef<str> + Send,
arguments: &A,
) -> Result<Option<R>, Error>
where
A: QueryArgs,
R: QueryResult,
R: QueryResult + Send,
{
let mut iteration = 0;
loop {
let mut conn = self.pool.acquire().await?;
let conn = conn.inner();
let state = &self.options.state;
let caps = Capabilities::MODIFICATIONS | Capabilities::DDL;
match conn
.query_single(query.as_ref(), arguments, state, caps)
.await
{
Ok(resp) => return Ok(resp.data),
Err(e) => {
let allow_retry = match e.get::<QueryCapabilities>() {
// Error from a weird source, or just a bug
// Let's keep on the safe side
None => false,
Some(QueryCapabilities::Unparsed) => true,
Some(QueryCapabilities::Parsed(c)) => c.is_empty(),
};
if allow_retry && e.has_tag(SHOULD_RETRY) {
let rule = self.options.retry.get_rule(&e);
iteration += 1;
if iteration < rule.attempts {
let duration = (rule.backoff)(iteration);
log::info!("Error: {:#}. Retrying in {:?}...", e, duration);
sleep(duration).await;
continue;
}
}
return Err(e);
}
}
}
Client::query_and_retry(
self,
query,
arguments,
IoFormat::Binary,
Cardinality::AtMostOne,
)
.await
.map(|x| x.into_iter().next())
}

/// Execute a query and return a single result
Expand Down Expand Up @@ -198,16 +200,26 @@ impl Client {
/// Similarly, dynamically typed results are also supported.
pub async fn query_required_single<R, A>(
&self,
query: impl AsRef<str>,
query: impl AsRef<str> + Send,
arguments: &A,
) -> Result<R, Error>
where
A: QueryArgs,
R: QueryResult,
R: QueryResult + Send,
{
self.query_single(query, arguments)
.await?
.ok_or_else(|| NoDataError::with_message("query row returned zero results"))
Client::query_and_retry(
self,
query,
arguments,
IoFormat::Binary,
Cardinality::AtMostOne,
)
.await
.and_then(|x| {
x.into_iter()
.next()
.ok_or_else(|| NoDataError::with_message("query row returned zero results"))
})
}

/// Execute a query and return the result as JSON.
Expand All @@ -216,93 +228,17 @@ impl Client {
query: impl AsRef<str>,
arguments: &impl QueryArgs,
) -> Result<Json, Error> {
let mut iteration = 0;
loop {
let mut conn = self.pool.acquire().await?;
let res = self
.query_and_retry::<String, _>(query, arguments, IoFormat::Json, Cardinality::Many)
.await?;

let flags = CompilationOptions {
implicit_limit: None,
implicit_typenames: false,
implicit_typeids: false,
explicit_objectids: true,
allow_capabilities: Capabilities::MODIFICATIONS | Capabilities::DDL,
io_format: IoFormat::Json,
expected_cardinality: Cardinality::Many,
};
let desc = match conn
.parse(&flags, query.as_ref(), &self.options.state)
.await
{
Ok(parsed) => parsed,
Err(e) => {
if e.has_tag(SHOULD_RETRY) {
let rule = self.options.retry.get_rule(&e);
iteration += 1;
if iteration < rule.attempts {
let duration = (rule.backoff)(iteration);
log::info!("Error: {:#}. Retrying in {:?}...", e, duration);
sleep(duration).await;
continue;
}
}
return Err(e);
}
};
let inp_desc = desc.input().map_err(ProtocolEncodingError::with_source)?;

let mut arg_buf = BytesMut::with_capacity(8);
arguments.encode(&mut Encoder::new(
&inp_desc.as_query_arg_context(),
&mut arg_buf,
))?;

let res = conn
.execute(
&flags,
query.as_ref(),
&self.options.state,
&desc,
&arg_buf.freeze(),
)
.await;
let data = match res {
Ok(data) => data,
Err(e) => {
if desc.capabilities == Capabilities::empty() && e.has_tag(SHOULD_RETRY) {
let rule = self.options.retry.get_rule(&e);
iteration += 1;
if iteration < rule.attempts {
let duration = (rule.backoff)(iteration);
log::info!("Error: {:#}. Retrying in {:?}...", e, duration);
sleep(duration).await;
continue;
}
}
return Err(e);
}
};
let json = res
.into_iter()
.next()
.ok_or_else(|| NoDataError::with_message("query row returned zero results"))?;

let out_desc = desc.output().map_err(ProtocolEncodingError::with_source)?;
match out_desc.root_pos() {
Some(root_pos) => {
let ctx = out_desc.as_queryable_context();
// JSON objects are returned as strings :(
let mut state = String::prepare(&ctx, root_pos)?;
let bytes = data
.into_iter()
.next()
.and_then(|chunk| chunk.data.into_iter().next());
if let Some(bytes) = bytes {
// we trust database to produce valid json
let s = String::decode(&mut state, &bytes)?;
return Ok(Json::new_unchecked(s));
} else {
return Err(NoDataError::with_message("query row returned zero results"));
}
}
None => return Err(NoResultExpected::build()),
}
}
// we trust database to produce valid json
Ok(Json::new_unchecked(json))
}

/// Execute a query and return a single result as JSON.
Expand All @@ -329,85 +265,12 @@ impl Client {
query: impl AsRef<str>,
arguments: &impl QueryArgs,
) -> Result<Option<Json>, Error> {
let query = query.as_ref();
let mut iteration = 0;
loop {
let mut conn = self.pool.acquire().await?;

let flags = CompilationOptions {
implicit_limit: None,
implicit_typenames: false,
implicit_typeids: false,
explicit_objectids: true,
allow_capabilities: Capabilities::MODIFICATIONS | Capabilities::DDL,
io_format: IoFormat::Json,
expected_cardinality: Cardinality::AtMostOne,
};
let desc = match conn.parse(&flags, query, &self.options.state).await {
Ok(parsed) => parsed,
Err(e) => {
if e.has_tag(SHOULD_RETRY) {
let rule = self.options.retry.get_rule(&e);
iteration += 1;
if iteration < rule.attempts {
let duration = (rule.backoff)(iteration);
log::info!("Error: {:#}. Retrying in {:?}...", e, duration);
sleep(duration).await;
continue;
}
}
return Err(e);
}
};
let inp_desc = desc.input().map_err(ProtocolEncodingError::with_source)?;

let mut arg_buf = BytesMut::with_capacity(8);
arguments.encode(&mut Encoder::new(
&inp_desc.as_query_arg_context(),
&mut arg_buf,
))?;
let res = self
.query_and_retry::<String, _>(query, arguments, IoFormat::Json, Cardinality::AtMostOne)
.await?;

let res = conn
.execute(&flags, query, &self.options.state, &desc, &arg_buf.freeze())
.await;
let data = match res {
Ok(data) => data,
Err(e) => {
if desc.capabilities == Capabilities::empty() && e.has_tag(SHOULD_RETRY) {
let rule = self.options.retry.get_rule(&e);
iteration += 1;
if iteration < rule.attempts {
let duration = (rule.backoff)(iteration);
log::info!("Error: {:#}. Retrying in {:?}...", e, duration);
sleep(duration).await;
continue;
}
}
return Err(e);
}
};

let out_desc = desc.output().map_err(ProtocolEncodingError::with_source)?;
match out_desc.root_pos() {
Some(root_pos) => {
let ctx = out_desc.as_queryable_context();
// JSON objects are returned as strings :(
let mut state = String::prepare(&ctx, root_pos)?;
let bytes = data
.into_iter()
.next()
.and_then(|chunk| chunk.data.into_iter().next());
if let Some(bytes) = bytes {
// we trust database to produce valid json
let s = String::decode(&mut state, &bytes)?;
return Ok(Some(Json::new_unchecked(s)));
} else {
return Ok(None);
}
}
None => return Err(NoResultExpected::build()),
}
}
// we trust database to produce valid json
Ok(res.into_iter().next().map(Json::new_unchecked))
}

/// Execute a query and return a single result as JSON.
Expand Down
Loading

0 comments on commit a63984d

Please sign in to comment.