Skip to content

Commit

Permalink
Rewriting dispatcher and connection (#10)
Browse files Browse the repository at this point in the history
* Fix passing dispatcher queue parameter

* TCP writer in separate task

* Revert "TCP writer in separate task"

This reverts commit 3962868.

* Attempt to simultaniously send and receive

* State in connection separated from TCP streams

* Separate tasks for read and write to TCP stream

* Version with split, not into_split

* Revert "Version with split, not into_split"

This reverts commit 640f48e.

* Mutex instead of BiLock

* Attempt to reintroduce internal queue to writer

* Another bench

* Attempt to decouple writer and internal queue

* Attempt to remove writer task

* Revert "Attempt to remove writer task"

This reverts commit 21381c8.

* Cleaner version without permit and options

* Final changes of new Connection

* Remove unnecessary dependency
  • Loading branch information
Flowneee authored Oct 3, 2023
1 parent 3ce3801 commit 5bd7a65
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 183 deletions.
12 changes: 9 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [0.0.9] - 2023-09-23
## [0.0.10] - 2023-10-04
### Added
- `dispatcher_internal_queue_size` parameter to builder, allowing to customize size of internal queue between dispatcher and connection.
- `internal_simultaneous_requests_threshold` parameter to builder, which allow to customize maximum number of simultaneously created requests, which connection can effectively handle.

### Changed
- Rewritten internal logic of connection to Tarantool, which improved performance separated reading and writing to socket into separate tasks.

### Fixed
- Increased size of internal queue between dispatcher and connection, which should significantly increase performance (previously it was degrading rapidly with a lot of parallel requests).
- Increased size of internal channel between dispatcher and connection, which should significantly increase performance (previously it was degrading rapidly with a lot of concurrent requests).


## [0.0.9] - 2023-09-23 (broken, yanked)


## [0.0.8] - 2023-09-05
Expand Down
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "tarantool-rs"
description = "Asyncronous tokio-based client for Tarantool"
version = "0.0.9"
version = "0.0.10"
edition = "2021"
authors = ["Andrey Kononov [email protected]"]
license = "MIT"
Expand All @@ -26,6 +26,7 @@ serde = { version = "1", features = ["derive"] }
sha-1 = "0.10"
thiserror = "1"
tokio = { version = "1", features = ["rt", "net", "io-util", "macros", "time"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7", default-features = false, features = ["codec"] }
tracing = { version = "0.1", features = ["log"] }

Expand All @@ -40,6 +41,7 @@ serde_json = "1"
tokio = { version = "1", features = ["full"] }
tracing-test = { version = "0.2", features = ["no-env-filter"] }
tarantool-test-container = { path = "tarantool-test-container" }
rusty_tarantool = "*"

[[example]]
name = "cli_client"
Expand All @@ -60,3 +62,7 @@ harness = false
[[bench]]
name = "compare"
harness = false

[[bench]]
name = "simple_loop"
harness = false
2 changes: 1 addition & 1 deletion benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn bench_tarantool_rs(c: &mut Criterion) {
// Bench logic
// NOTE: on my PC converting to join add slight overhead (1-2 microseconds for 1 future input)
// NOTE: on my PC 50 input load tarantool to 50% on single core
for parallel in [1, 2, 5, 10, 50].into_iter() {
for parallel in [1, 50, 250, 1000].into_iter() {
group.bench_with_input(BenchmarkId::new("ping", parallel), &parallel, |b, p| {
b.to_async(&tokio_rt).iter(|| async {
let make_fut = |_| conn.ping();
Expand Down
48 changes: 48 additions & 0 deletions benches/simple_loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::time::{Duration, Instant};

use futures::{stream::repeat_with, StreamExt};
use tarantool_rs::{Connection, ExecutorExt};

type TarantoolTestContainer = tarantool_test_container::TarantoolTestContainer<
tarantool_test_container::TarantoolDefaultArgs,
>;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let container = TarantoolTestContainer::default();

let conn = Connection::builder()
.internal_simultaneous_requests_threshold(1000)
.build(format!("127.0.0.1:{}", container.connect_port()))
.await?;
// let conn = rusty_tarantool::tarantool::ClientConfig::new(
// format!("127.0.0.1:{}", container.connect_port()),
// "guest",
// "",
// )
// .build();
// conn.ping().await?;

let mut counter = 0u64;
let mut last_measured_counter = 0;
let mut last_measured_ts = Instant::now();

let interval_secs = 2;
let interval = Duration::from_secs(interval_secs);

let mut stream = repeat_with(|| conn.ping()).buffer_unordered(1000);
while let _ = stream.next().await {
counter += 1;
if last_measured_ts.elapsed() > interval {
last_measured_ts = Instant::now();
let counter_diff = counter - last_measured_counter;
last_measured_counter = counter;
println!(
"Iterations over last {interval_secs} seconds: {counter_diff}, per second: {}",
counter_diff / interval_secs
);
}
}

Ok(())
}
27 changes: 16 additions & 11 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct ConnectionBuilder {
connect_timeout: Option<Duration>,
reconnect_interval: Option<ReconnectInterval>,
sql_statement_cache_capacity: usize,
dispatcher_internal_queue_size: usize,
internal_simultaneous_requests_threshold: usize,
}

impl Default for ConnectionBuilder {
Expand All @@ -81,7 +81,7 @@ impl Default for ConnectionBuilder {
connect_timeout: None,
reconnect_interval: Some(ReconnectInterval::default()),
sql_statement_cache_capacity: DEFAULT_SQL_STATEMENT_CACHE_CAPACITY,
dispatcher_internal_queue_size: DEFAULT_DISPATCHER_INTERNAL_QUEUE_SIZE,
internal_simultaneous_requests_threshold: DEFAULT_DISPATCHER_INTERNAL_QUEUE_SIZE,
}
}
}
Expand All @@ -92,17 +92,18 @@ impl ConnectionBuilder {
where
A: ToSocketAddrs + Display + Clone + Send + Sync + 'static,
{
let (dispatcher, disaptcher_sender) = Dispatcher::new(
let (dispatcher_fut, disaptcher_sender) = Dispatcher::prepare(
addr,
self.user.as_deref(),
self.password.as_deref(),
self.timeout,
self.reconnect_interval.clone(),
self.internal_simultaneous_requests_threshold,
)
.await?;

// TODO: support setting custom executor
tokio::spawn(dispatcher.run());
tokio::spawn(dispatcher_fut);
let conn = Connection::new(
disaptcher_sender,
self.timeout,
Expand Down Expand Up @@ -194,16 +195,20 @@ impl ConnectionBuilder {
self
}

/// Sets size of the internal queue between connection and dispatcher.
/// Prepare `Connection` to process `value` number of simultaneously created requests.
///
/// This queue contains all requests, made from [`Connection`]s/[`Stream`]s/etc.
/// Increasing its size can help if you have a lot of requests, made concurrently
/// and frequently, however this will increase memory consumption slightly.
/// It is not hard limit, but making more simultaneous requests than this value
/// will result in degradation in performance, so try to increase this value,
/// if you unsatisfied with performance.
///
/// Internally connection have multiple bounded channels, and this parameter mostly
/// affect size of this channels. Increasing this value can help if you have a lot of simultaneously
/// created requests, however this will increase memory consumption.
///
/// By default set to 500, which should be reasonable compromise between memory
/// (about 50 KB) and performance.
pub fn dispatcher_internal_queue_size(&mut self, size: usize) -> &mut Self {
self.dispatcher_internal_queue_size = size;
/// (about 100 KB) and performance.
pub fn internal_simultaneous_requests_threshold(&mut self, value: usize) -> &mut Self {
self.internal_simultaneous_requests_threshold = value;
self
}
}
73 changes: 54 additions & 19 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rmp::{
encode::{RmpWriteErr, ValueWriteError},
};
use rmpv::Value;
use tokio::time::error::Elapsed;
use tokio::{task::JoinError, time::error::Elapsed};

/// Error returned by Tarantool in response to a request.
#[derive(Clone, Debug, thiserror::Error)]
Expand Down Expand Up @@ -66,8 +66,8 @@ pub enum Error {
SpaceMissingPrimaryIndex,

/// Underlying TCP connection closed.
#[error("TCP connection error")]
ConnectionError(#[from] Arc<tokio::io::Error>),
#[error("TCP connection IO error")]
Io(#[from] Arc<tokio::io::Error>),
/// Underlying TCP connection was closed.
#[error("TCP connection closed")]
ConnectionClosed,
Expand All @@ -79,15 +79,14 @@ pub enum Error {

impl From<tokio::io::Error> for Error {
fn from(v: tokio::io::Error) -> Self {
Self::ConnectionError(Arc::new(v))
Self::Io(Arc::new(v))
}
}

impl From<CodecDecodeError> for Error {
fn from(value: CodecDecodeError) -> Self {
match value {
CodecDecodeError::Io(x) => x.into(),
CodecDecodeError::Closed => Self::ConnectionClosed,
CodecDecodeError::Decode(x) => x.into(),
}
}
Expand All @@ -102,6 +101,17 @@ impl From<CodecEncodeError> for Error {
}
}

impl From<ConnectionError> for Error {
fn from(value: ConnectionError) -> Self {
match value {
ConnectionError::Io(x) => x.into(),
ConnectionError::ConnectionClosed => Self::ConnectionClosed,
ConnectionError::Decode(x) => x.into(),
err @ ConnectionError::JoinError(_) => Self::Other(err.into()),
}
}
}

impl From<Elapsed> for Error {
fn from(_: Elapsed) -> Self {
Self::Timeout
Expand Down Expand Up @@ -296,28 +306,53 @@ impl DecodingErrorLocation {
}

/// Helper type to return errors from decoder.
#[derive(Clone)]
#[derive(Debug, thiserror::Error)]
pub(crate) enum CodecDecodeError {
#[error(transparent)]
Io(#[from] tokio::io::Error),
#[error(transparent)]
Decode(#[from] DecodingError),
}

/// Helper type to return errors from encoder.
#[derive(Debug, thiserror::Error)]
pub(crate) enum CodecEncodeError {
#[error(transparent)]
Io(#[from] tokio::io::Error),
#[error(transparent)]
Encode(#[from] EncodingError),
}

/// Error type, returned to client from connection.
#[derive(Clone, Debug, thiserror::Error)]
pub(crate) enum ConnectionError {
#[error(transparent)]
Io(Arc<tokio::io::Error>),
Closed,
Decode(DecodingError),
#[error("Connection closed")]
ConnectionClosed,
#[error(transparent)]
Decode(#[from] DecodingError),
#[error("Tokio JoinHandle error: {0:?}")]
JoinError(#[source] Arc<JoinError>),
}

impl From<tokio::io::Error> for CodecDecodeError {
fn from(v: tokio::io::Error) -> Self {
Self::Io(Arc::new(v))
impl From<tokio::io::Error> for ConnectionError {
fn from(value: tokio::io::Error) -> Self {
Self::Io(Arc::new(value))
}
}

/// Helper type to return errors from encoder.
#[derive(Debug)]
pub(crate) enum CodecEncodeError {
Io(tokio::io::Error),
Encode(EncodingError),
impl From<CodecDecodeError> for ConnectionError {
fn from(value: CodecDecodeError) -> Self {
match value {
CodecDecodeError::Io(x) => x.into(),
CodecDecodeError::Decode(x) => x.into(),
}
}
}

impl From<tokio::io::Error> for CodecEncodeError {
fn from(v: tokio::io::Error) -> Self {
Self::Io(v)
impl From<JoinError> for ConnectionError {
fn from(value: JoinError) -> Self {
Self::JoinError(Arc::new(value))
}
}
Loading

0 comments on commit 5bd7a65

Please sign in to comment.