From 7a978e629f891b4d8d12312270aaa2a9131ac163 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Sat, 13 Jul 2019 16:42:13 +0300 Subject: [PATCH] Add support of a custom executor --- src/conn/mod.rs | 236 +++++++++++++---------- src/conn/pool/futures/disconnect_pool.rs | 8 +- src/conn/pool/futures/get_conn.rs | 18 +- src/conn/pool/mod.rs | 179 +++++++++++------ src/error.rs | 17 ++ src/lib.rs | 17 ++ src/queryable/mod.rs | 2 +- 7 files changed, 306 insertions(+), 171 deletions(-) diff --git a/src/conn/mod.rs b/src/conn/mod.rs index 0630df2f..89579383 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -36,7 +36,7 @@ pub mod pool; pub mod stmt_cache; /// Helper function that asynchronously disconnects connection on the default tokio executor. -fn disconnect(mut conn: Conn) { +fn disconnect(mut conn: Conn) { use tokio::executor::{DefaultExecutor, Executor}; let mut executor = DefaultExecutor::current(); @@ -48,13 +48,13 @@ fn disconnect(mut conn: Conn) { if !disconnected { // Server will report broken connection if spawn fails. let _ = executor.spawn(Box::new( - conn.cleanup().and_then(Conn::disconnect).map_err(drop), + conn.cleanup().and_then(Conn::::disconnect).map_err(drop), )); } } /// Mysql connection -struct ConnInner { +struct ConnInner { stream: Option, id: u32, version: (u16, u16, u16), @@ -67,7 +67,7 @@ struct ConnInner { last_insert_id: u64, affected_rows: u64, warnings: u16, - pool: Option, + pool: Option>, has_result: Option<(Arc>, Option)>, in_transaction: bool, opts: Opts, @@ -81,7 +81,7 @@ struct ConnInner { disconnected: bool, } -impl fmt::Debug for ConnInner { +impl fmt::Debug for ConnInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Conn") .field("connection id", &self.id) @@ -94,9 +94,9 @@ impl fmt::Debug for ConnInner { } } -impl ConnInner { +impl ConnInner { /// Constructs an empty connection. - fn empty(opts: Opts) -> ConnInner { + fn empty(opts: Opts) -> ConnInner { ConnInner { last_command: consts::Command::COM_PING, capabilities: opts.get_capabilities(), @@ -125,53 +125,110 @@ impl ConnInner { } } -#[derive(Debug)] -pub struct Conn { - inner: Box, +pub struct Conn { + executor: T, + inner: Box>, +} + +impl fmt::Debug for Conn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.inner) + } } impl Conn { - /// Returns the ID generated by a query (usually `INSERT`) on a table with a column having the - /// `AUTO_INCREMENT` attribute. Returns `None` if there was no previous query on the connection - /// or if the query did not update an AUTO_INCREMENT value. - pub fn last_insert_id(&self) -> Option { - self.get_last_insert_id() + /// Returns future that resolves to `Conn`. + pub fn new>(opts: T) -> impl MyFuture { + let executor = ::tokio::executor::DefaultExecutor::current(); + Conn::with_executor(executor, opts) } - /// Returns the number of rows affected by the last `INSERT`, `UPDATE`, `REPLACE` or `DELETE` - /// query. - pub fn affected_rows(&self) -> u64 { - self.get_affected_rows() + /// Returns future that resolves to `Conn`. + pub fn from_url>(url: T) -> impl MyFuture { + Opts::from_str(url.as_ref()) + .map_err(Error::from) + .into_future() + .and_then(Conn::new) } +} - fn close(mut self) -> impl MyFuture<()> { - self.inner.disconnected = true; - self.cleanup().and_then(Conn::disconnect) +impl Conn { + /// Returns future that resolves to `Conn`. + pub fn from_url_with_executor>(executor: E, url: T) -> impl MyFuture { + Opts::from_str(url.as_ref()) + .map_err(Error::from) + .into_future() + .and_then(|opts| Conn::with_executor(executor, opts)) } - fn is_secure(&self) -> bool { - if let Some(ref stream) = self.inner.stream { - stream.is_secure() + /// Returns future that resolves to `Conn`. + pub fn with_executor>(executor: E, opts: T) -> impl MyFuture { + let opts = opts.into(); + let mut conn = Conn::empty(executor, opts.clone()); + + let stream = if let Some(path) = opts.get_socket() { + A(Stream::connect_socket(path.to_owned())) } else { - false - } + B(Stream::connect_tcp(( + opts.get_ip_or_hostname(), + opts.get_tcp_port(), + ))) + }; + + stream + .map(move |stream| { + conn.inner.stream = Some(stream); + conn + }) + .and_then(Conn::setup_stream) + .and_then(Conn::handle_handshake) + .and_then(Conn::switch_to_ssl_if_needed) + .and_then(Conn::do_handshake_response) + .and_then(Conn::continue_auth) + .and_then(Conn::read_socket) + .and_then(Conn::reconnect_via_socket_if_needed) + .and_then(Conn::read_max_allowed_packet) + .and_then(Conn::read_wait_timeout) + .and_then(Conn::run_init_commands) } /// Hacky way to move connection through &mut. `self` becomes unusable. - fn take(&mut self) -> Conn { - let inner = mem::replace(&mut *self.inner, ConnInner::empty(Default::default())); + fn take(&mut self) -> Self { + let executor = self.executor.clone(); + let inner = mem::replace(&mut *self.inner, ConnInner::::empty(Default::default())); Conn { + executor, inner: Box::new(inner), } } - fn empty(opts: Opts) -> Self { + /// Returns true if time since last io exceeds wait_timeout (or conn_ttl if specified in opts). + fn expired(&self) -> bool { + let idle_duration = SteadyTime::now() - self.inner.last_io; + let ttl = self + .inner + .opts + .get_conn_ttl() + .unwrap_or(self.inner.wait_timeout); + idle_duration.num_milliseconds() > i64::from(ttl) * 1000 + } + + fn is_secure(&self) -> bool { + if let Some(ref stream) = self.inner.stream { + stream.is_secure() + } else { + false + } + } + + fn empty(executor: E, opts: Opts) -> Self { Self { + executor, inner: Box::new(ConnInner::empty(opts)), } } - fn setup_stream(mut self) -> Result { + fn setup_stream(mut self) -> Result { if let Some(stream) = self.inner.stream.take() { stream.set_keepalive_ms(self.inner.opts.get_tcp_keepalive())?; stream.set_tcp_nodelay(self.inner.opts.get_tcp_nodelay())?; @@ -182,7 +239,25 @@ impl Conn { } } - fn handle_handshake(self) -> impl MyFuture { + /// Returns the ID generated by a query (usually `INSERT`) on a table with a column having the + /// `AUTO_INCREMENT` attribute. Returns `None` if there was no previous query on the connection + /// or if the query did not update an AUTO_INCREMENT value. + pub fn last_insert_id(&self) -> Option { + self.get_last_insert_id() + } + + /// Returns the number of rows affected by the last `INSERT`, `UPDATE`, `REPLACE` or `DELETE` + /// query. + pub fn affected_rows(&self) -> u64 { + self.get_affected_rows() + } + + fn close(mut self) -> impl MyFuture<()> { + self.inner.disconnected = true; + self.cleanup().and_then(Conn::disconnect) + } + + fn handle_handshake(self) -> impl MyFuture { self.read_packet().and_then(move |(mut conn, packet)| { parse_handshake_packet(&*packet.0) .map_err(Error::from) @@ -212,7 +287,7 @@ impl Conn { }) } - fn switch_to_ssl_if_needed(self) -> impl MyFuture { + fn switch_to_ssl_if_needed(self) -> impl MyFuture { if self .inner .opts @@ -238,7 +313,7 @@ impl Conn { } } - fn do_handshake_response(self) -> impl MyFuture { + fn do_handshake_response(self) -> impl MyFuture { let auth_data = self .inner .auth_plugin @@ -259,7 +334,7 @@ impl Conn { fn perform_auth_switch( mut self, auth_switch_request: AuthSwitchRequest<'_>, - ) -> BoxFuture { + ) -> BoxFuture { if !self.inner.auth_switched { self.inner.auth_switched = true; self.inner.nonce = auth_switch_request.plugin_data().into(); @@ -277,7 +352,7 @@ impl Conn { } } - fn continue_auth(self) -> impl MyFuture { + fn continue_auth(self) -> impl MyFuture { match self.inner.auth_plugin { AuthPlugin::MysqlNativePassword => A(self.continue_mysql_native_password_auth()), AuthPlugin::CachingSha2Password => B(self.continue_caching_sha2_password_auth()), @@ -285,7 +360,7 @@ impl Conn { } } - fn continue_caching_sha2_password_auth(self) -> impl MyFuture { + fn continue_caching_sha2_password_auth(self) -> impl MyFuture { self.read_packet() .and_then(|(conn, packet)| match packet.as_ref().get(0) { Some(0x01) => match packet.as_ref().get(1) { @@ -340,7 +415,7 @@ impl Conn { }) } - fn continue_mysql_native_password_auth(self) -> impl MyFuture { + fn continue_mysql_native_password_auth(self) -> impl MyFuture { self.read_packet() .and_then(|(this, packet)| match packet.0.get(0) { Some(0x00) => A(ok(this)), @@ -360,11 +435,11 @@ impl Conn { }) } - fn drop_packet(self) -> impl MyFuture { + fn drop_packet(self) -> impl MyFuture { self.read_packet().map(|(conn, _)| conn) } - fn run_init_commands(self) -> impl MyFuture { + fn run_init_commands(self) -> impl MyFuture { let init = self .inner .opts @@ -375,7 +450,7 @@ impl Conn { loop_fn( (init, self), - |(mut init, conn): (Vec, Conn)| match init.pop() { + |(mut init, conn): (Vec, Conn)| match init.pop() { None => A(ok(Loop::Break(conn))), Some(query) => { let fut = conn @@ -387,60 +462,24 @@ impl Conn { ) } - /// Returns future that resolves to `Conn`. - pub fn new>(opts: T) -> impl MyFuture { - let opts = opts.into(); - let mut conn = Conn::empty(opts.clone()); - - let stream = if let Some(path) = opts.get_socket() { - A(Stream::connect_socket(path.to_owned())) - } else { - B(Stream::connect_tcp(( - opts.get_ip_or_hostname(), - opts.get_tcp_port(), - ))) - }; - - stream - .map(move |stream| { - conn.inner.stream = Some(stream); - conn - }) - .and_then(Conn::setup_stream) - .and_then(Conn::handle_handshake) - .and_then(Conn::switch_to_ssl_if_needed) - .and_then(Conn::do_handshake_response) - .and_then(Conn::continue_auth) - .and_then(Conn::read_socket) - .and_then(Conn::reconnect_via_socket_if_needed) - .and_then(Conn::read_max_allowed_packet) - .and_then(Conn::read_wait_timeout) - .and_then(Conn::run_init_commands) - } - - /// Returns future that resolves to `Conn`. - pub fn from_url>(url: T) -> impl MyFuture { - Opts::from_str(url.as_ref()) - .map_err(Error::from) - .into_future() - .and_then(Conn::new) - } - /// Will try to connect via socket using socket address in `self.inner.socket`. /// /// Returns new connection on success or self on error. /// /// Won't try to reconnect if socket connection is already enforced in `Opts`. - fn reconnect_via_socket_if_needed(self) -> Box> { + fn reconnect_via_socket_if_needed(self) -> Box> { if let Some(socket) = self.inner.socket.as_ref() { let opts = self.inner.opts.clone(); if opts.get_socket().is_none() { let mut builder = OptsBuilder::from_opts(opts); builder.socket(Some(&**socket)); - let fut = Conn::new(builder).then(|result| match result { - Ok(conn) => Ok(conn), - Err(_) => Ok(self), - }); + let fut = + Conn::with_executor(self.executor.clone(), builder).then( + |result| match result { + Ok(conn) => Ok(conn), + Err(_) => Ok(self), + }, + ); return Box::new(fut); } } @@ -479,19 +518,8 @@ impl Conn { }) } - /// Returns true if time since last io exceeds wait_timeout (or conn_ttl if specified in opts). - fn expired(&self) -> bool { - let idle_duration = SteadyTime::now() - self.inner.last_io; - let ttl = self - .inner - .opts - .get_conn_ttl() - .unwrap_or(self.inner.wait_timeout); - idle_duration.num_milliseconds() > i64::from(ttl) * 1000 - } - /// Returns future that resolves to a `Conn` with `COM_RESET_CONNECTION` executed on it. - pub fn reset(self) -> impl MyFuture { + pub fn reset(self) -> impl MyFuture { let pool = self.inner.pool.clone(); let fut = if self.inner.version > (5, 7, 2) { let fut = self @@ -500,7 +528,13 @@ impl Conn { .map(|(conn, _)| conn); (ok(pool), A(fut)) } else { - (ok(pool), B(Conn::new(self.inner.opts.clone()))) + ( + ok(pool), + B(Conn::with_executor( + self.executor.clone(), + self.inner.opts.clone(), + )), + ) }; fut.into_future().map(|(pool, mut conn)| { conn.inner.stmt_cache.clear(); @@ -515,7 +549,7 @@ impl Conn { self.drop_query("ROLLBACK") } - fn drop_result(mut self) -> impl MyFuture { + fn drop_result(mut self) -> impl MyFuture { match self.inner.has_result.take() { Some((columns, None)) => A(B(query_result::assemble::<_, TextProtocol>( self, @@ -533,7 +567,7 @@ impl Conn { } } - fn cleanup(self) -> BoxFuture { + fn cleanup(self) -> BoxFuture { if self.inner.has_result.is_some() { Box::new(self.drop_result().and_then(Self::cleanup)) } else if self.inner.in_transaction { @@ -544,7 +578,7 @@ impl Conn { } } -impl ConnectionLike for Conn { +impl ConnectionLike for Conn { fn take_stream(mut self) -> (Streamless, Stream) { let stream = self.inner.stream.take().expect("Logic error: stream taken"); (Streamless::new(self), stream) diff --git a/src/conn/pool/futures/disconnect_pool.rs b/src/conn/pool/futures/disconnect_pool.rs index 0be7a211..c79b9316 100644 --- a/src/conn/pool/futures/disconnect_pool.rs +++ b/src/conn/pool/futures/disconnect_pool.rs @@ -19,17 +19,17 @@ use std::sync::{atomic, Arc}; /// /// Active connections taken from this pool should be disconnected manually. /// Also all pending and new `GetConn`'s will resolve to error. -pub struct DisconnectPool { - pool_inner: Arc, +pub struct DisconnectPool { + pool_inner: Arc>, } -pub fn new(pool: Pool) -> DisconnectPool { +pub fn new(pool: Pool) -> DisconnectPool { DisconnectPool { pool_inner: pool.inner, } } -impl Future for DisconnectPool { +impl Future for DisconnectPool { type Item = (); type Error = Error; diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index d268c9a5..c04d739d 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -14,29 +14,29 @@ use crate::{ MyFuture, }; -pub(crate) enum GetConnInner { +pub(crate) enum GetConnInner { New, - Done(Option), + Done(Option>), // TODO: one day this should be an existential // TODO: impl Drop? - Connecting(Box>), + Connecting(Box>>), } /// This future will take connection from a pool and resolve to `Conn`. -pub struct GetConn { - pub(crate) pool: Option, - pub(crate) inner: GetConnInner, +pub struct GetConn { + pub(crate) pool: Option>, + pub(crate) inner: GetConnInner, } -pub fn new(pool: &Pool) -> GetConn { +pub fn new(pool: &Pool) -> GetConn { GetConn { pool: Some(pool.clone()), inner: GetConnInner::New, } } -impl Future for GetConn { - type Item = Conn; +impl Future for GetConn { + type Item = Conn; type Error = Error; fn poll(&mut self) -> Poll { diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index f2b92496..e4fe4fd7 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -7,11 +7,8 @@ // modified, or distributed except according to those terms. use ::futures::stream::futures_unordered::FuturesUnordered; -use ::futures::{ - task::{self, Task}, - try_ready, Async, Future, Poll, Stream, -}; - +use ::futures::task::{self, Task}; +use ::futures::{try_ready, Async, Future, Poll, Stream}; use tokio_sync::mpsc; use std::{ @@ -34,19 +31,19 @@ use crate::{ // this is a really unfortunate name for a module pub mod futures; -struct Recycler { - inner: Arc, +pub struct Recycler { + inner: Arc>, discard: FuturesUnordered>, discarded: usize, - cleaning: FuturesUnordered>, + cleaning: FuturesUnordered>>, // Option so that we have a way to send a "I didn't make a Conn after all" signal - dropped: mpsc::UnboundedReceiver>, + dropped: mpsc::UnboundedReceiver>>, min: usize, eof: bool, } -impl Future for Recycler { +impl Future for Recycler { type Item = (); type Error = (); @@ -184,19 +181,19 @@ impl Future for Recycler { } } -struct Inner { +struct Inner { close: atomic::AtomicBool, closed: atomic::AtomicBool, - idle: crossbeam::queue::ArrayQueue, + idle: crossbeam::queue::ArrayQueue>, wake: crossbeam::queue::SegQueue, exist: atomic::AtomicUsize, extra_wakeups: atomic::AtomicUsize, // only used to spawn the recycler the first time we're in async context - maker: Mutex>>>, + maker: Mutex>>>>, } -impl Inner { +impl Inner { fn wake(&self, mut readied: usize) { if readied == 0 { return; @@ -233,14 +230,15 @@ impl Inner { #[derive(Clone)] /// Asynchronous pool of MySql connections. -pub struct Pool { +pub struct Pool { opts: Opts, - inner: Arc, + inner: Arc>, + executor: T, pool_constraints: PoolConstraints, - drop: mpsc::UnboundedSender>, + drop: mpsc::UnboundedSender>>, } -impl fmt::Debug for Pool { +impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Pool") .field("opts", &self.opts) @@ -252,11 +250,51 @@ impl fmt::Debug for Pool { impl Pool { /// Creates new pool of connections. pub fn new>(opts: O) -> Pool { + Pool::with_executor(::tokio::executor::DefaultExecutor::current(), opts) + } + + /// Creates new pool of connections. + pub fn from_url>(url: T) -> Result { + let opts = Opts::from_str(url.as_ref())?; + Ok(Pool::new(opts)) + } +} + +impl Pool { + /// A way to return connection taken from a pool. + fn return_conn(&mut self, conn: Conn) { + // NOTE: we're not in async context here, so we can't block or return NotReady + // any and all cleanup work _has_ to be done in the spawned recycler + + // fast-path for when the connection is immediately ready to be reused + if conn.inner.stream.is_some() + && !conn.inner.disconnected + && !conn.expired() + && !conn.inner.in_transaction + && conn.inner.has_result.is_none() + && !self.inner.close.load(atomic::Ordering::Acquire) + && self.inner.idle.len() < self.pool_constraints.min() + { + self.inner + .idle + .push(conn) + .expect("more connections than max"); + self.inner.wake(1); + } else { + self.drop + .try_send(Some(conn)) + .expect("recycler is active as long as any Pool is"); + } + } + + /// Creates new pool of connections. + pub fn with_executor>(executor: E, opts: O) -> Pool { let opts = opts.into(); let pool_constraints = opts.get_pool_constraints().clone(); let (tx, rx) = mpsc::unbounded_channel(); Pool { opts, + executor, inner: Arc::new(Inner { close: false.into(), closed: false.into(), @@ -272,13 +310,13 @@ impl Pool { } /// Creates new pool of connections. - pub fn from_url>(url: T) -> Result { + pub fn from_url_with_executor>(executor: E, url: T) -> Result> { let opts = Opts::from_str(url.as_ref())?; - Ok(Pool::new(opts)) + Ok(Pool::with_executor(executor, opts)) } /// Returns future that resolves to `Conn`. - pub fn get_conn(&self) -> GetConn { + pub fn get_conn(&self) -> GetConn { new_get_conn(self) } @@ -286,7 +324,7 @@ impl Pool { pub fn start_transaction( &self, options: TransactionOptions, - ) -> impl MyFuture> { + ) -> impl MyFuture>> { self.get_conn() .and_then(|conn| Queryable::start_transaction(conn, options)) } @@ -295,7 +333,7 @@ impl Pool { /// /// Active connections taken from this pool should be disconnected manually. /// Also all pending and new `GetConn`'s will resolve to error. - pub fn disconnect(mut self) -> DisconnectPool { + pub fn disconnect(mut self) -> DisconnectPool { let was_closed = self.inner.close.swap(true, atomic::Ordering::AcqRel); if !was_closed { // make sure we wake up the Recycler. @@ -307,38 +345,12 @@ impl Pool { new_disconnect_pool(self) } - /// A way to return connection taken from a pool. - fn return_conn(&mut self, conn: Conn) { - // NOTE: we're not in async context here, so we can't block or return NotReady - // any and all cleanup work _has_ to be done in the spawned recycler - - // fast-path for when the connection is immediately ready to be reused - if conn.inner.stream.is_some() - && !conn.inner.disconnected - && !conn.expired() - && !conn.inner.in_transaction - && conn.inner.has_result.is_none() - && !self.inner.close.load(atomic::Ordering::Acquire) - && self.inner.idle.len() < self.pool_constraints.min() - { - self.inner - .idle - .push(conn) - .expect("more connections than max"); - self.inner.wake(1); - } else { - self.drop - .try_send(Some(conn)) - .expect("recycler is active as long as any Pool is"); - } - } - /// Poll the pool for an available connection. - fn poll_new_conn(&mut self) -> Result> { + fn poll_new_conn(&mut self) -> Result>> { self.poll_new_conn_inner(false) } - fn poll_new_conn_inner(&mut self, retrying: bool) -> Result> { + fn poll_new_conn_inner(&mut self, retrying: bool) -> Result>> { if self.inner.close.load(atomic::Ordering::Acquire) { return Err(Error::Driver(DriverError::PoolDisconnected)); } @@ -354,7 +366,7 @@ impl Pool { return Ok(Async::Ready(GetConn { pool: Some(self.clone()), - inner: GetConnInner::Done(Some(conn)), + inner: GetConnInner::::Done(Some(conn)), })); } } @@ -371,7 +383,7 @@ impl Pool { let mut lock = self.inner.maker.lock().unwrap(); if let Some(dropped) = lock.take() { // we're the first connection! - tokio::spawn(Recycler { + self.executor.execute(Box::new(Recycler { inner: self.inner.clone(), discard: FuturesUnordered::new(), discarded: 0, @@ -379,7 +391,7 @@ impl Pool { dropped, min: self.pool_constraints.min(), eof: false, - }); + }))?; } } @@ -407,7 +419,10 @@ impl Pool { return Ok(Async::Ready(GetConn { pool: Some(self.clone()), - inner: GetConnInner::Connecting(Box::new(Conn::new(self.opts.clone()))), + inner: GetConnInner::::Connecting(Box::new(Conn::with_executor( + self.executor.clone(), + self.opts.clone(), + ))), })); } @@ -489,7 +504,7 @@ impl Pool { } } -impl Drop for Conn { +impl Drop for Conn { fn drop(&mut self) { if let Some(mut pool) = self.inner.pool.take() { pool.return_conn(self.take()); @@ -501,6 +516,7 @@ impl Drop for Conn { #[cfg(test)] mod test { + use futures::future::Executor; use futures::{collect, future, Future}; use std::sync::atomic; @@ -706,6 +722,57 @@ mod test { .unwrap(); } + #[test] + fn should_run_on_current_thread_runtime() { + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + let pool = Pool::new(&**DATABASE_URL); + for _ in 0..10 { + use futures::{Sink, Stream}; + let (tx, rx) = futures::sync::mpsc::unbounded(); + for i in 0..10_000 { + let pool = pool.clone(); + let tx = tx.clone(); + runtime.spawn(futures::future::lazy(move || { + pool.get_conn() + .map_err(|e| unreachable!("{:?}", e)) + .and_then(move |_| tx.send(i).map_err(|e| unreachable!("{:?}", e))) + .map(|_| ()) + })); + } + drop(tx); + runtime.block_on(rx.fold(0, |_, _i| Ok(0))).unwrap(); + } + drop(pool); + runtime.run().unwrap(); + } + + #[test] + fn should_run_on_current_thread_executor() { + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + let handle = runtime.handle(); + let pool = Pool::with_executor(handle.clone(), &**DATABASE_URL); + for _ in 0..10 { + use futures::{Sink, Stream}; + let (tx, rx) = futures::sync::mpsc::unbounded(); + for i in 0..10 { + let pool = pool.clone(); + let tx = tx.clone(); + handle + .execute(futures::future::lazy(move || { + pool.get_conn() + .map_err(|e| unreachable!("{:?}", e)) + .and_then(move |_| tx.send(i).map_err(|e| unreachable!("{:?}", e))) + .map(|_| ()) + })) + .unwrap(); + } + drop(tx); + runtime.block_on(rx.fold(0, |_, _i| Ok(0))).unwrap(); + } + drop(pool); + runtime.run().unwrap(); + } + #[test] #[ignore] fn should_not_panic_if_dropped_without_tokio_runtime() { diff --git a/src/error.rs b/src/error.rs index e9ea3d8f..a59f57da 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,7 @@ pub use url::ParseError; use failure::Fail; +use futures::future::ExecuteError; use mysql_common::{ named_params::MixedParamsError, packets::ErrPacket, params::MissingNamedParameterError, row::Row, value::Value, @@ -40,6 +41,9 @@ pub enum Error { #[fail(display = "URL error: `{}'", _0)] Url(#[cause] UrlError), + + #[fail(display = "Connection pool error: `{}'", _0)] + Pool(#[cause] PoolError), } /// This type represents MySql server error. @@ -51,6 +55,13 @@ pub struct ServerError { pub state: String, } +/// This type enumerates connection pool errors. +#[derive(Debug, Fail)] +pub enum PoolError { + #[fail(display = "Can't spawn Future on current executor")] + SpawnError, +} + /// This type enumerates connection URL errors. #[derive(Debug, Fail)] pub enum UrlError { @@ -243,3 +254,9 @@ impl From for Error { Error::Url(err.into()) } } + +impl From> for Error { + fn from(_err: ExecuteError) -> Self { + Error::Pool(PoolError::SpawnError) + } +} diff --git a/src/lib.rs b/src/lib.rs index 950fc02f..bd53ae21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -146,6 +146,23 @@ impl MyFuture for U where { } +/// Alias for supported executor. +pub trait MyExecutor: + ::futures::future::Executor + Send + 'static>> + + Send + + Clone + + 'static +{ +} +impl MyExecutor for T +where + T: ::futures::future::Executor< + Box + Send + 'static>, + >, + T: Send + Clone + 'static, +{ +} + #[doc(inline)] pub use self::conn::Conn; diff --git a/src/queryable/mod.rs b/src/queryable/mod.rs index f512cc3d..0fe6c6d8 100644 --- a/src/queryable/mod.rs +++ b/src/queryable/mod.rs @@ -208,5 +208,5 @@ where } } -impl Queryable for Conn {} +impl Queryable for Conn {} impl Queryable for Transaction {}