Skip to content

Commit

Permalink
Merge pull request #55 from blackbeam/issues
Browse files Browse the repository at this point in the history
Fix #50, #51, #53, #54
  • Loading branch information
blackbeam authored Jun 1, 2019
2 parents e51926f + fd74174 commit bfea526
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 24 deletions.
59 changes: 59 additions & 0 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,13 @@ mod test {
run(fut).unwrap();
}

#[test]
fn should_not_panic_if_dropped_without_tokio_runtime() {
let fut = Conn::new(get_opts());
run(fut).unwrap();
// connection will drop here
}

#[test]
fn should_execute_init_queries_on_new_connection() {
let mut opts_builder = OptsBuilder::from_opts(get_opts());
Expand Down Expand Up @@ -820,6 +827,58 @@ mod test {
run(fut).unwrap();
}

#[test]
fn should_try_collect() {
let fut = Conn::new(get_opts())
.and_then(|conn| {
Queryable::query(
conn,
r"SELECT 'hello', 123
UNION ALL
SELECT 'world', 'bar'
UNION ALL
SELECT 'hello', 123
",
)
})
.and_then(|result| result.try_collect::<(String, u8)>())
.and_then(|(result, mut rows)| {
assert!(rows.pop().unwrap().is_ok());
assert!(rows.pop().unwrap().is_err());
assert!(rows.pop().unwrap().is_ok());
result.drop_result()
})
.and_then(Conn::disconnect);

run(fut).unwrap()
}

#[test]
fn should_try_collect_and_drop() {
let fut = Conn::new(get_opts())
.and_then(|conn| {
Queryable::query(
conn,
r"SELECT 'hello', 123
UNION ALL
SELECT 'world', 'bar'
UNION ALL
SELECT 'hello', 123;
SELECT 'foo', 255;
",
)
})
.and_then(|result| result.try_collect_and_drop::<(String, u8)>())
.and_then(|(conn, mut rows)| {
assert!(rows.pop().unwrap().is_ok());
assert!(rows.pop().unwrap().is_err());
assert!(rows.pop().unwrap().is_ok());
conn.disconnect()
});

run(fut).unwrap()
}

#[test]
fn should_handle_mutliresult_set() {
let fut = Conn::new(get_opts())
Expand Down
104 changes: 82 additions & 22 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,20 @@ impl Pool {
let min = self.pool_constraints.min();

self.with_inner(|mut inner| {
inner.ongoing -= 1;

if inner.closed {
return;
}

if conn.inner.stream.is_none() {
// drop incomplete connection
inner.ongoing -= 1;
return;
}

if conn.inner.in_transaction || conn.inner.has_result.is_some() {
inner.queue.push(conn.cleanup());
} else {
inner.ongoing -= 1;

if inner.idle.len() >= min {
crate::conn::disconnect(conn);
} else {
Expand Down Expand Up @@ -219,6 +218,7 @@ impl Pool {
($vec:ident { $($p:pat => $b:block,)+ }) => ({
let len = inner.$vec.len();
let mut done_fut_idxs = Vec::new();

for i in 0..len {
let result = inner.$vec.get_mut(i).unwrap().poll();
match result {
Expand All @@ -227,6 +227,16 @@ impl Pool {
}

let out: Result<()> = match result {
Ok(Ready(conn)) => {
if inner.closed {
crate::conn::disconnect(conn);
} else {
inner.ongoing += 1;
returned_conns.push(conn);
}
handled = true;
Ok(())
}
$($p => $b),+
_ => {
Ok(())
Expand All @@ -253,30 +263,12 @@ impl Pool {

// Handle dirty connections.
handle!(queue {
Ok(Ready(conn)) => {
if inner.closed {
crate::conn::disconnect(conn);
} else {
returned_conns.push(conn);
}
handled = true;
Ok(())
},
// Drop it in case of error.
Err(_) => { Ok(()) },
});

// Handle connecting connections.
handle!(new {
Ok(Ready(conn)) => {
if inner.closed {
crate::conn::disconnect(conn);
} else {
inner.ongoing += 1;
returned_conns.push(conn);
}
handled = true;
Ok(())
},
Err(err) => {
if !inner.closed {
Err(err)
Expand Down Expand Up @@ -343,6 +335,7 @@ impl Drop for Conn {

#[cfg(test)]
mod test {
use futures::collect;
use futures::Future;

use crate::{
Expand Down Expand Up @@ -404,6 +397,63 @@ mod test {
run(fut).unwrap();
}

#[test]
fn should_hold_bounds2() {
use std::cmp::max;

const POOL_MIN: usize = 5;
const POOL_MAX: usize = 10;

let url = format!(
"{}?pool_min={}&pool_max={}",
&**DATABASE_URL, POOL_MIN, POOL_MAX
);

// Clean
let pool = Pool::new(url.clone());
let pool_clone = pool.clone();
let conns = (0..POOL_MAX).map(|_| pool.get_conn()).collect::<Vec<_>>();

let fut = ::futures::future::join_all(conns)
.map(move |mut conns| {
let mut popped = 0;
assert_eq!(pool_clone.inner.lock().unwrap().conn_count(), POOL_MAX);

while let Some(_) = conns.pop().map(drop) {
popped += 1;
assert_eq!(
pool_clone.inner.lock().unwrap().conn_count(),
POOL_MAX + POOL_MIN - max(popped, POOL_MIN)
);
}

pool_clone
})
.and_then(|pool| pool.disconnect());

run(fut).unwrap();

// Dirty
let pool = Pool::new(url.clone());
let pool_clone = pool.clone();
let conns = (0..POOL_MAX)
.map(|_| {
pool.get_conn()
.and_then(|conn| conn.start_transaction(TransactionOptions::new()))
})
.collect::<Vec<_>>();

let fut = ::futures::future::join_all(conns).map(move |mut conns| {
assert_eq!(pool_clone.inner.lock().unwrap().conn_count(), POOL_MAX);

while let Some(_) = conns.pop().map(drop) {
assert_eq!(pool_clone.inner.lock().unwrap().conn_count(), POOL_MAX);
}
});

run(fut).unwrap();
}

#[test]
fn should_hold_bounds() {
let pool = Pool::new(format!("{}?pool_min=1&pool_max=2", &**DATABASE_URL));
Expand Down Expand Up @@ -483,6 +533,16 @@ mod test {
run(fut).unwrap();
}

#[test]
fn should_not_panic_if_dropped_without_tokio_runtime() {
let pool = Pool::new(&**DATABASE_URL);
run(collect(
(0..10).map(|_| pool.get_conn()).collect::<Vec<_>>(),
))
.unwrap();
// pool will drop here
}

#[cfg(feature = "nightly")]
mod bench {
use futures::Future;
Expand Down
47 changes: 46 additions & 1 deletion src/queryable/query_result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ use futures::future::{
Future, FutureResult, Loop,
};
use mysql_common::packets::RawPacket;
use mysql_common::row::convert::FromRowError;

use std::{marker::PhantomData, mem, sync::Arc};
use std::marker::PhantomData;
use std::mem;
use std::result::Result as StdResult;
use std::sync::Arc;

use self::QueryResultInner::*;
use crate::{
Expand Down Expand Up @@ -229,6 +233,12 @@ where
/// as many times as result sets in your query result. For example query
/// `SELECT 'foo'; SELECT 'foo', 'bar';` will produce `QueryResult` with two result sets in it.
/// One can use `QueryResult::is_empty` to make sure that there is no more result sets.
///
/// # Panic
///
/// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
/// * In case of programmer error see `FromRow` docs;
/// * In case of unknown schema use [`QueryResult::try_collect`].
pub fn collect<R>(self) -> impl MyFuture<(Self, Vec<R>)>
where
R: FromRow,
Expand All @@ -240,8 +250,29 @@ where
})
}

/// Returns future that collects result set of this query.
///
/// It works the same way as [`QueryResult::collect`] but won't panic
/// if row isn't convertible to `R`.
pub fn try_collect<R>(self) -> impl MyFuture<(Self, Vec<StdResult<R, FromRowError>>)>
where
R: FromRow,
R: Send + 'static,
{
self.reduce(Vec::new(), |mut acc, row| {
acc.push(FromRow::from_row_opt(row));
acc
})
}

/// Returns future that collects result set of a query result and drops everything else.
/// It will resolve to a pair of wrapped `Queryable` and collected result set.
///
/// # Panic
///
/// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
/// * In case of programmer error see `FromRow` docs;
/// * In case of unknown schema use [`QueryResult::try_collect`].
pub fn collect_and_drop<R>(self) -> impl MyFuture<(T, Vec<R>)>
where
R: FromRow,
Expand All @@ -251,6 +282,20 @@ where
.and_then(|(this, output)| (this.drop_result(), ok(output)))
}

/// Returns future that collects result set of a query result and drops everything else.
/// It will resolve to a pair of wrapped `Queryable` and collected result set.
///
/// It works the same way as [`QueryResult::collect_and_drop`] but won't panic
/// if row isn't convertible to `R`.
pub fn try_collect_and_drop<R>(self) -> impl MyFuture<(T, Vec<StdResult<R, FromRowError>>)>
where
R: FromRow,
R: Send + 'static,
{
self.try_collect()
.and_then(|(this, output)| (this.drop_result(), ok(output)))
}

/// Returns future that will execute `fun` on every row of current result set.
///
/// It will stop on result set boundary (see `QueryResult::collect` docs).
Expand Down
3 changes: 2 additions & 1 deletion tests/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ fn use_generic_code() {
let fut = pool
.get_conn()
.and_then(move |conn| conn.query("SELECT 1, 2, 3"))
.and_then(get_single_result::<(u8, u8, u8), _, _>);
.and_then(get_single_result::<(u8, u8, u8), _, _>)
.and_then(|out| pool.disconnect().map(move |_| out));

let result = run(fut).unwrap();
assert_eq!(result, (1, 2, 3));
Expand Down

0 comments on commit bfea526

Please sign in to comment.