diff --git a/CHANGELOG.md b/CHANGELOG.md index aa1d3a7..7f76206 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.2.0] - 2023-05-18 ### Added - `indices` method to `SpaceMetadata` for accessing space's indices; - - `get_by_name` and `get_by_id` methods to `UniqueIdNameMap`. + - `get_by_name` and `get_by_id` methods to `UniqueIdNameMap`; + - reconnection in background, if current conection died; + - optional timeout on connection. + +### Changed + - `ConnectionBuilder` most methods now accept new values as `impl Into>`; + - `TransactionBuilder` methods now return `&mut Self`. ## [0.0.1] - 2023-05-15 diff --git a/Cargo.toml b/Cargo.toml index cd3703c..3f76a75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tarantool-rs" description = "Asyncronous tokio-based client for Tarantool" -version = "0.0.1" +version = "0.0.2" edition = "2021" authors = ["Andrey Kononov flowneee3@gmail.com"] license = "MIT" @@ -13,6 +13,7 @@ repository = "https://github.com/Flowneee/tarantool-rs" [dependencies] anyhow = "1" async-trait = "0.1" +backoff = "0.4" base64 = "0.13" bytes = "1" futures = "0.3" @@ -22,7 +23,7 @@ rmpv = { version = "1", features = ["with-serde"] } serde = { version = "1", features = ["derive"] } sha-1 = "0.10" thiserror = "1" -tokio = { version = "1", features = ["rt", "net", "io-util", "macros"] } +tokio = { version = "1", features = ["rt", "net", "io-util", "macros", "time"] } tokio-util = { version = "0.7", default-features = false, features = ["codec"] } tracing = { version = "0.1", features = ["log"] } diff --git a/README.md b/README.md index 26c0b72..2f63867 100644 --- a/README.md +++ b/README.md @@ -15,12 +15,15 @@ For examples of how to use this crate check `examples/` folder. * [x] remote function calling * [x] CRUD operations * [x] transaction control (begin/commit/rollback) +* [x] reconnection in background * [ ] SQL requests * [ ] chunked responses * [ ] watchers and events -* [ ] reconnection in background * [ ] connection pooling * [ ] automatic schema fetching and reloading +* [ ] graceful shutdown protocol support +* [ ] pre Tarantool 2.10 versions support +* [ ] customizable connection features (streams/watchers/mvcc) * [ ] ... diff --git a/docker-compose.yml b/docker-compose.yml index f4fc468..9891b32 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,8 @@ services: tarantool: image: tarantool/tarantool - volumes: - - ./tests:/opt/tarantool ports: - "3301:3301" environment: TT_MEMTX_USE_MVCC_ENGINE: true - command: ["tarantool", "/opt/tarantool/test_data.lua"] + command: ["tarantool"] diff --git a/examples/cli_client.rs b/examples/cli_client.rs index 0e28a49..bc8c236 100644 --- a/examples/cli_client.rs +++ b/examples/cli_client.rs @@ -12,9 +12,13 @@ struct Args { #[tokio::main] async fn main() -> Result<(), anyhow::Error> { + let _ = pretty_env_logger::init(); + let args = Args::parse(); - let conn = Connection::builder().build(&args.tarantool_address).await?; + let conn = Connection::builder() + .build(args.tarantool_address.clone()) + .await?; println!("connected to Tarantool instance {}", args.tarantool_address); let mut rl = DefaultEditor::new()?; diff --git a/src/builder.rs b/src/builder.rs new file mode 100644 index 0000000..248bc76 --- /dev/null +++ b/src/builder.rs @@ -0,0 +1,169 @@ +use std::{cmp::max, fmt::Display, time::Duration}; + +use tokio::net::ToSocketAddrs; +use tracing::debug; + +use crate::{ + client::Connection, + codec::{consts::TransactionIsolationLevel, request::Id}, + errors::Error, + transport::Dispatcher, +}; + +/// Interval parameters for background reconnection. +#[derive(Clone, Debug, PartialEq)] +pub enum ReconnectInterval { + Fixed(Duration), + ExponentialBackoff { + min: Duration, + max: Duration, + randomization_factor: f64, + multiplier: f64, + }, +} + +impl Default for ReconnectInterval { + fn default() -> Self { + Self::exponential_backoff(Duration::from_millis(1), Duration::from_secs(1), 0.5, 5.0) + } +} + +impl ReconnectInterval { + /// Fixed interval between reconnection attempts. + pub fn fixed(interval: Duration) -> Self { + Self::Fixed(interval) + } + + /// Interval between reconnection attempts calculated as + /// exponentially growing period. + /// + /// For details on this values check [`backoff::ExponentialBackoff`] docs. + pub fn exponential_backoff( + min_interval: Duration, + max_interval: Duration, + randomization_factor: f64, + multiplier: f64, + ) -> Self { + Self::ExponentialBackoff { + min: max(min_interval, Duration::from_micros(1)), + max: max_interval, + randomization_factor, + multiplier, + } + } +} + +/// Build connection to Tarantool. +#[derive(Debug)] +pub struct ConnectionBuilder { + user: Option, + password: Option, + transaction_timeout: Option, + transaction_isolation_level: TransactionIsolationLevel, + connect_timeout: Option, + reconnect_interval: Option, +} + +impl Default for ConnectionBuilder { + fn default() -> Self { + Self { + user: Default::default(), + password: Default::default(), + transaction_timeout: Default::default(), + transaction_isolation_level: Default::default(), + connect_timeout: Default::default(), + reconnect_interval: Some(ReconnectInterval::default()), + } + } +} + +impl ConnectionBuilder { + /// Create connection to Tarantool using provided address. + pub async fn build(&self, addr: A) -> Result + where + A: ToSocketAddrs + Display + Clone + Send + Sync + 'static, + { + let (dispatcher, disaptcher_sender) = Dispatcher::new( + addr, + self.user.as_deref(), + self.password.as_deref(), + self.connect_timeout, + self.reconnect_interval.clone(), + ) + .await?; + + // TODO: support setting custom executor + tokio::spawn(dispatcher.run()); + let conn = Connection::new( + disaptcher_sender, + self.transaction_timeout, + self.transaction_isolation_level, + ); + + // TODO: add option to disable pre 2.10 features (ID request, streams, watchers) + let features = Id::default(); + debug!( + "Setting supported features: VERSION - {}, STREAMS - {}, TRANSACTIONS - {}, ERROR_EXTENSION - {}, WATCHERS = {}", + features.protocol_version, + features.streams, + features.transactions, + features.error_extension, + features.watchers + ); + conn.id(features).await?; + + Ok(conn) + } + + /// Sets user login and, optionally, password, used for this connection. + /// + /// AUTH message sent upon connecting to server. + pub fn auth<'a>(&mut self, user: &str, password: impl Into>) -> &mut Self { + self.user = Some(user.into()); + self.password = password.into().map(Into::into); + self + } + + /// Sets default timeout for transactions. + /// + /// By default disabled. + pub fn transaction_timeout( + &mut self, + transaction_timeout: impl Into>, + ) -> &mut Self { + self.transaction_timeout = transaction_timeout.into(); + self + } + + /// Sets default transaction isolation level. + /// + /// By default `TransactionIsolationLevel::Default` (i.e. use box.cfg default value). + pub fn transaction_isolation_level( + &mut self, + transaction_isolation_level: TransactionIsolationLevel, + ) -> &mut Self { + self.transaction_isolation_level = transaction_isolation_level; + self + } + + /// Sets timeout for connect. + /// + /// By default disabled. + pub fn connect_timeout(&mut self, connect_timeout: impl Into>) -> &mut Self { + self.connect_timeout = connect_timeout.into(); + self + } + + /// Sets interval between reconnection attempts. + /// + /// If disabled, next attempt wil lbe started as soon as last one finished. + /// + /// By default set to `ReconnectInterval::exponential_backoff(Duration::from_millis(1), Duration::from_secs(1), 0.5, 5.0)`. + pub fn reconnect_interval( + &mut self, + reconnect_interval: impl Into>, + ) -> &mut Self { + self.reconnect_interval = reconnect_interval.into(); + self + } +} diff --git a/src/client/builder.rs b/src/client/builder.rs deleted file mode 100644 index 151d78c..0000000 --- a/src/client/builder.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::{fmt::Display, time::Duration}; - -use tokio::net::ToSocketAddrs; -use tracing::debug; - -use super::connection::Connection; -use crate::{ - codec::{consts::TransactionIsolationLevel, request::Id}, - errors::Error, - transport::Dispatcher, -}; - -/// Build connection to Tarantool. -#[derive(Default)] -pub struct ConnectionBuilder { - user: Option, - password: Option, - transaction_timeout: Option, - transaction_isolation_level: TransactionIsolationLevel, -} - -impl ConnectionBuilder { - /// Create connection to Tarantool using provided address. - pub async fn build(&self, addr: A) -> Result - where - A: ToSocketAddrs + Display, - { - let (dispatcher, disaptcher_sender) = - Dispatcher::new(addr, self.user.as_deref(), self.password.as_deref()).await?; - - // TODO: support setting custom executor - tokio::spawn(dispatcher.run()); - let conn = Connection::new( - disaptcher_sender, - self.transaction_timeout, - self.transaction_isolation_level, - ); - - // TODO: add option to disable pre 2.10 features (ID request, streams, watchers) - let features = Id::default(); - debug!( - "Setting supported features: VERSION - {}, STREAMS - {}, TRANSACTIONS - {}, ERROR_EXTENSION - {}, WATCHERS = {}", - features.protocol_version, - features.streams, - features.transactions, - features.error_extension, - features.watchers - ); - conn.id(features).await?; - - Ok(conn) - } - - /// Sets user login and, optionally, password, used for this connection. - /// - /// AUTH message sent upon connecting to server. - pub fn auth(&mut self, user: &str, password: Option<&str>) -> &mut Self { - self.user = Some(user.into()); - self.password = password.map(Into::into); - self - } - - /// Sets default timeout in transactions. - /// - /// By default disabled. - pub fn transaction_timeout(&mut self, transaction_timeout: Option) -> &mut Self { - self.transaction_timeout = transaction_timeout; - self - } - - /// Sets default transaction isolation level. - /// - /// By default `TransactionIsolationLevel::Default` (i.e. use box.cfg default value). - pub fn transaction_isolation_level( - &mut self, - transaction_isolation_level: TransactionIsolationLevel, - ) -> &mut Self { - self.transaction_isolation_level = transaction_isolation_level; - self - } -} diff --git a/src/client/connection.rs b/src/client/connection.rs index bc8aee5..c548427 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -11,10 +11,9 @@ use futures::{Future, TryFutureExt}; use rmpv::Value; use tracing::debug; -use super::{ - connection_like::ConnectionLike, ConnectionBuilder, Stream, Transaction, TransactionBuilder, -}; +use super::{connection_like::ConnectionLike, Stream, Transaction, TransactionBuilder}; use crate::{ + builder::ConnectionBuilder, codec::{ consts::TransactionIsolationLevel, request::{Id, Request, RequestBody}, diff --git a/src/client/mod.rs b/src/client/mod.rs index 0d23684..ef40975 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,5 +1,4 @@ pub use self::{ - builder::ConnectionBuilder, connection::Connection, connection_like::ConnectionLike, stream::Stream, @@ -8,7 +7,6 @@ pub use self::{ pub mod schema; -mod builder; mod connection; mod connection_like; mod stream; diff --git a/src/client/transaction.rs b/src/client/transaction.rs index 2c48996..cc071ae 100644 --- a/src/client/transaction.rs +++ b/src/client/transaction.rs @@ -127,12 +127,14 @@ impl TransactionBuilder { } } - pub fn timeout(&mut self, timeout: Option) { - self.timeout_secs = timeout.as_ref().map(Duration::as_secs_f64); + pub fn timeout(&mut self, timeout: impl Into>) -> &mut Self { + self.timeout_secs = timeout.into().as_ref().map(Duration::as_secs_f64); + self } - pub fn isolation_level(&mut self, isolation_level: TransactionIsolationLevel) { + pub fn isolation_level(&mut self, isolation_level: TransactionIsolationLevel) -> &mut Self { self.isolation_level = isolation_level; + self } pub async fn begin(&self) -> Result { diff --git a/src/errors.rs b/src/errors.rs index 06ce206..84a059b 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -29,6 +29,14 @@ pub enum Error { /// Error, returned in response from Tarantool instance. #[error("Error response: {0}")] Response(#[from] ErrorResponse), + + /// Timeout. + #[error("Timeout")] + Timeout, + /// Timeout while establishing connection. + #[error("Connect timeout")] + ConnectTimeout, + /// Authorization error. #[error("Authorization error: {} (code {})" ,.0.description, .0.code)] Auth(#[source] ErrorResponse), diff --git a/src/lib.rs b/src/lib.rs index 6e5706c..48fd978 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ // * [ ] background schema fetching, reloading and invalidating // * [ ] triggers on connection events (connect/disconnect/schema reloading) // * [ ] SQL +// * [ ] graceful shutdown protocol // // Other // @@ -20,6 +21,7 @@ pub use rmpv::Value; pub use self::{ + builder::{ConnectionBuilder, ReconnectInterval}, client::*, codec::consts::{IteratorType, TransactionIsolationLevel}, errors::Error, @@ -28,6 +30,7 @@ pub use self::{ pub mod errors; pub mod utils; +mod builder; mod client; mod codec; mod transport; diff --git a/src/transport/connection.rs b/src/transport/connection.rs index fb73e41..771761c 100644 --- a/src/transport/connection.rs +++ b/src/transport/connection.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, fmt::Display, sync::atomic::{AtomicU32, Ordering}, + time::Duration, }; use futures::{SinkExt, TryStreamExt}; @@ -31,7 +32,7 @@ pub(crate) struct Connection { // TODO: cancel impl Connection { - pub(crate) async fn new( + async fn new_inner( addr: A, user: Option<&str>, password: Option<&str>, @@ -62,6 +63,24 @@ impl Connection { Ok(this) } + pub(super) async fn new( + addr: A, + user: Option<&str>, + password: Option<&str>, + timeout: Option, + ) -> Result + where + A: ToSocketAddrs + Display, + { + match timeout { + Some(dur) => tokio::time::timeout(dur, Self::new_inner(addr, user, password)) + .await + .map_err(|_| Error::ConnectTimeout) + .and_then(|x| x), + None => Self::new_inner(addr, user, password).await, + } + } + async fn auth(&mut self, user: &str, password: Option<&str>, salt: &[u8]) -> Result<(), Error> { let mut request = Request::new(Auth::new(user, password, salt), None).unwrap(); *request.sync_mut() = self.next_sync(); diff --git a/src/transport/dispatcher.rs b/src/transport/dispatcher.rs index 5968221..b634056 100644 --- a/src/transport/dispatcher.rs +++ b/src/transport/dispatcher.rs @@ -1,16 +1,17 @@ -use std::fmt::Display; +use std::{fmt::Display, future::Future, pin::Pin, time::Duration}; +use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder}; use futures::TryFutureExt; use tokio::{ net::ToSocketAddrs, sync::{mpsc, oneshot}, }; -use tracing::debug; +use tracing::{debug, error}; use super::connection::Connection; use crate::{ codec::{request::Request, response::Response}, - Error, + Error, ReconnectInterval, }; // Arc here is necessary to send same error to all waiting in-flights @@ -36,12 +37,16 @@ impl DispatcherSender { } } +type ConnectDynFuture = dyn Future> + Send; + /// Dispatching messages from client to connection. /// /// Currently no-op, in future it should handle reconnects, schema reloading, pooling. pub(crate) struct Dispatcher { rx: mpsc::Receiver, conn: Connection, + conn_factory: Box Pin> + Send + Sync>, + reconnect_interval: Option, } impl Dispatcher { @@ -49,20 +54,72 @@ impl Dispatcher { addr: A, user: Option<&str>, password: Option<&str>, + connect_timeout: Option, + reconnect_interval: Option, ) -> Result<(Self, DispatcherSender), Error> where - A: ToSocketAddrs + Display, + A: ToSocketAddrs + Display + Clone + Send + Sync + 'static, { - let conn = Connection::new(addr, user, password).await?; + let user: Option = user.map(Into::into); + let password: Option = password.map(Into::into); + let conn_factory = Box::new(move || { + let addr = addr.clone(); + let user = user.clone(); + let password = password.clone(); + let connect_timeout = connect_timeout; + Box::pin(async move { + Connection::new(addr, user.as_deref(), password.as_deref(), connect_timeout).await + }) as Pin> + }); + + let conn = conn_factory().await?; // TODO: test whether increased size can help with performance let (tx, rx) = mpsc::channel(1); - Ok((Self { rx, conn }, DispatcherSender { tx })) + Ok(( + Self { + rx, + conn, + conn_factory, + reconnect_interval, + }, + DispatcherSender { tx }, + )) + } + + async fn reconnect(&mut self) { + let mut reconn_int_state = self + .reconnect_interval + .as_ref() + .map(ReconnectIntervalState::from); + loop { + match (self.conn_factory)().await { + Ok(conn) => { + self.conn = conn; + return; + } + Err(err) => { + error!("Failed to reconnect to Tarantool: {:#}", err); + if let Some(ref mut x) = reconn_int_state { + tokio::time::sleep(x.next_timeout()).await; + } + } + } + } } pub(crate) async fn run(mut self) { debug!("Starting dispatcher"); + loop { + if self.run_conn().await { + return; + } + self.reconnect().await; + } + } + + pub(crate) async fn run_conn(&mut self) -> bool { let err = loop { tokio::select! { next = self.conn.handle_next_response() => { @@ -72,16 +129,66 @@ impl Dispatcher { } next = self.rx.recv() => { if let Some((request, tx)) = next { - if let Err(err) = self.conn.send_request(request, tx).await { - break err.into(); + // Check whether tx is closed in case someone cancelled request + // while it was in queue + if !tx.is_closed() { + if let Err(err) = self.conn.send_request(request, tx).await { + break err.into(); + } } } else { debug!("All senders dropped"); - return + return true } } } }; self.conn.finish_with_error(err); + false + } +} + +/// Get interval before next reconnect attempt. +#[derive(Debug)] +enum ReconnectIntervalState { + Fixed(Duration), + ExponentialBackoff { + state: ExponentialBackoff, + max: Duration, + }, +} + +impl ReconnectIntervalState { + fn next_timeout(&mut self) -> Duration { + match self { + ReconnectIntervalState::Fixed(x) => *x, + + ReconnectIntervalState::ExponentialBackoff { ref mut state, max } => { + dbg!(state).next_backoff().unwrap_or(*max) + } + } + } +} + +impl From<&ReconnectInterval> for ReconnectIntervalState { + fn from(value: &ReconnectInterval) -> Self { + match value { + ReconnectInterval::Fixed(x) => Self::Fixed(*x), + ReconnectInterval::ExponentialBackoff { + min, + max, + randomization_factor, + multiplier, + } => { + let state = ExponentialBackoffBuilder::new() + .with_initial_interval(*min) + .with_max_interval(*max) + .with_randomization_factor(*randomization_factor) + .with_multiplier(*multiplier) + .with_max_elapsed_time(None) + .build(); + Self::ExponentialBackoff { state, max: *max } + } + } } }