Skip to content

Commit

Permalink
Background reconnect. API improvements. Release v0.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Flowneee committed May 17, 2023
1 parent 6c648c4 commit d1123aa
Show file tree
Hide file tree
Showing 14 changed files with 344 additions and 108 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [Unreleased]
## [0.2.0] - 2023-05-18
### Added
- `indices` method to `SpaceMetadata` for accessing space's indices;
- `get_by_name` and `get_by_id` methods to `UniqueIdNameMap`.
- `get_by_name` and `get_by_id` methods to `UniqueIdNameMap`;
- reconnection in background, if current conection died;
- optional timeout on connection.

### Changed
- `ConnectionBuilder` most methods now accept new values as `impl Into<Option<...>>`;
- `TransactionBuilder` methods now return `&mut Self`.


## [0.0.1] - 2023-05-15
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "tarantool-rs"
description = "Asyncronous tokio-based client for Tarantool"
version = "0.0.1"
version = "0.0.2"
edition = "2021"
authors = ["Andrey Kononov [email protected]"]
license = "MIT"
Expand All @@ -13,6 +13,7 @@ repository = "https://github.com/Flowneee/tarantool-rs"
[dependencies]
anyhow = "1"
async-trait = "0.1"
backoff = "0.4"
base64 = "0.13"
bytes = "1"
futures = "0.3"
Expand All @@ -22,7 +23,7 @@ rmpv = { version = "1", features = ["with-serde"] }
serde = { version = "1", features = ["derive"] }
sha-1 = "0.10"
thiserror = "1"
tokio = { version = "1", features = ["rt", "net", "io-util", "macros"] }
tokio = { version = "1", features = ["rt", "net", "io-util", "macros", "time"] }
tokio-util = { version = "0.7", default-features = false, features = ["codec"] }
tracing = { version = "0.1", features = ["log"] }

Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ For examples of how to use this crate check `examples/` folder.
* [x] remote function calling
* [x] CRUD operations
* [x] transaction control (begin/commit/rollback)
* [x] reconnection in background
* [ ] SQL requests
* [ ] chunked responses
* [ ] watchers and events
* [ ] reconnection in background
* [ ] connection pooling
* [ ] automatic schema fetching and reloading
* [ ] graceful shutdown protocol support
* [ ] pre Tarantool 2.10 versions support
* [ ] customizable connection features (streams/watchers/mvcc)
* [ ] ...


4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
services:
tarantool:
image: tarantool/tarantool
volumes:
- ./tests:/opt/tarantool
ports:
- "3301:3301"
environment:
TT_MEMTX_USE_MVCC_ENGINE: true
command: ["tarantool", "/opt/tarantool/test_data.lua"]
command: ["tarantool"]
6 changes: 5 additions & 1 deletion examples/cli_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ struct Args {

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let _ = pretty_env_logger::init();

let args = Args::parse();

let conn = Connection::builder().build(&args.tarantool_address).await?;
let conn = Connection::builder()
.build(args.tarantool_address.clone())
.await?;
println!("connected to Tarantool instance {}", args.tarantool_address);

let mut rl = DefaultEditor::new()?;
Expand Down
169 changes: 169 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use std::{cmp::max, fmt::Display, time::Duration};

use tokio::net::ToSocketAddrs;
use tracing::debug;

use crate::{
client::Connection,
codec::{consts::TransactionIsolationLevel, request::Id},
errors::Error,
transport::Dispatcher,
};

/// Interval parameters for background reconnection.
#[derive(Clone, Debug, PartialEq)]
pub enum ReconnectInterval {
Fixed(Duration),
ExponentialBackoff {
min: Duration,
max: Duration,
randomization_factor: f64,
multiplier: f64,
},
}

impl Default for ReconnectInterval {
fn default() -> Self {
Self::exponential_backoff(Duration::from_millis(1), Duration::from_secs(1), 0.5, 5.0)
}
}

impl ReconnectInterval {
/// Fixed interval between reconnection attempts.
pub fn fixed(interval: Duration) -> Self {
Self::Fixed(interval)
}

/// Interval between reconnection attempts calculated as
/// exponentially growing period.
///
/// For details on this values check [`backoff::ExponentialBackoff`] docs.
pub fn exponential_backoff(
min_interval: Duration,
max_interval: Duration,
randomization_factor: f64,
multiplier: f64,
) -> Self {
Self::ExponentialBackoff {
min: max(min_interval, Duration::from_micros(1)),
max: max_interval,
randomization_factor,
multiplier,
}
}
}

/// Build connection to Tarantool.
#[derive(Debug)]
pub struct ConnectionBuilder {
user: Option<String>,
password: Option<String>,
transaction_timeout: Option<Duration>,
transaction_isolation_level: TransactionIsolationLevel,
connect_timeout: Option<Duration>,
reconnect_interval: Option<ReconnectInterval>,
}

impl Default for ConnectionBuilder {
fn default() -> Self {
Self {
user: Default::default(),
password: Default::default(),
transaction_timeout: Default::default(),
transaction_isolation_level: Default::default(),
connect_timeout: Default::default(),
reconnect_interval: Some(ReconnectInterval::default()),
}
}
}

impl ConnectionBuilder {
/// Create connection to Tarantool using provided address.
pub async fn build<A>(&self, addr: A) -> Result<Connection, Error>
where
A: ToSocketAddrs + Display + Clone + Send + Sync + 'static,
{
let (dispatcher, disaptcher_sender) = Dispatcher::new(
addr,
self.user.as_deref(),
self.password.as_deref(),
self.connect_timeout,
self.reconnect_interval.clone(),
)
.await?;

// TODO: support setting custom executor
tokio::spawn(dispatcher.run());
let conn = Connection::new(
disaptcher_sender,
self.transaction_timeout,
self.transaction_isolation_level,
);

// TODO: add option to disable pre 2.10 features (ID request, streams, watchers)
let features = Id::default();
debug!(
"Setting supported features: VERSION - {}, STREAMS - {}, TRANSACTIONS - {}, ERROR_EXTENSION - {}, WATCHERS = {}",
features.protocol_version,
features.streams,
features.transactions,
features.error_extension,
features.watchers
);
conn.id(features).await?;

Ok(conn)
}

/// Sets user login and, optionally, password, used for this connection.
///
/// AUTH message sent upon connecting to server.
pub fn auth<'a>(&mut self, user: &str, password: impl Into<Option<&'a str>>) -> &mut Self {
self.user = Some(user.into());
self.password = password.into().map(Into::into);
self
}

/// Sets default timeout for transactions.
///
/// By default disabled.
pub fn transaction_timeout(
&mut self,
transaction_timeout: impl Into<Option<Duration>>,
) -> &mut Self {
self.transaction_timeout = transaction_timeout.into();
self
}

/// Sets default transaction isolation level.
///
/// By default `TransactionIsolationLevel::Default` (i.e. use box.cfg default value).
pub fn transaction_isolation_level(
&mut self,
transaction_isolation_level: TransactionIsolationLevel,
) -> &mut Self {
self.transaction_isolation_level = transaction_isolation_level;
self
}

/// Sets timeout for connect.
///
/// By default disabled.
pub fn connect_timeout(&mut self, connect_timeout: impl Into<Option<Duration>>) -> &mut Self {
self.connect_timeout = connect_timeout.into();
self
}

/// Sets interval between reconnection attempts.
///
/// If disabled, next attempt wil lbe started as soon as last one finished.
///
/// By default set to `ReconnectInterval::exponential_backoff(Duration::from_millis(1), Duration::from_secs(1), 0.5, 5.0)`.
pub fn reconnect_interval(
&mut self,
reconnect_interval: impl Into<Option<ReconnectInterval>>,
) -> &mut Self {
self.reconnect_interval = reconnect_interval.into();
self
}
}
81 changes: 0 additions & 81 deletions src/client/builder.rs

This file was deleted.

5 changes: 2 additions & 3 deletions src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ use futures::{Future, TryFutureExt};
use rmpv::Value;
use tracing::debug;

use super::{
connection_like::ConnectionLike, ConnectionBuilder, Stream, Transaction, TransactionBuilder,
};
use super::{connection_like::ConnectionLike, Stream, Transaction, TransactionBuilder};
use crate::{
builder::ConnectionBuilder,
codec::{
consts::TransactionIsolationLevel,
request::{Id, Request, RequestBody},
Expand Down
2 changes: 0 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub use self::{
builder::ConnectionBuilder,
connection::Connection,
connection_like::ConnectionLike,
stream::Stream,
Expand All @@ -8,7 +7,6 @@ pub use self::{

pub mod schema;

mod builder;
mod connection;
mod connection_like;
mod stream;
Expand Down
8 changes: 5 additions & 3 deletions src/client/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,14 @@ impl TransactionBuilder {
}
}

pub fn timeout(&mut self, timeout: Option<Duration>) {
self.timeout_secs = timeout.as_ref().map(Duration::as_secs_f64);
pub fn timeout(&mut self, timeout: impl Into<Option<Duration>>) -> &mut Self {
self.timeout_secs = timeout.into().as_ref().map(Duration::as_secs_f64);
self
}

pub fn isolation_level(&mut self, isolation_level: TransactionIsolationLevel) {
pub fn isolation_level(&mut self, isolation_level: TransactionIsolationLevel) -> &mut Self {
self.isolation_level = isolation_level;
self
}

pub async fn begin(&self) -> Result<Transaction, Error> {
Expand Down
Loading

0 comments on commit d1123aa

Please sign in to comment.