Skip to content

Commit

Permalink
Add performance counters gated by metrics feature
Browse files Browse the repository at this point in the history
  • Loading branch information
cloneable committed Sep 14, 2023
1 parent c5f620e commit 2d95f84
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 14 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ once_cell = "1.7.2"
pem = "3.0"
percent-encoding = "2.1.0"
pin-project = "1.0.2"
serde = "1"
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
socket2 = "0.5.2"
thiserror = "1.0.4"
Expand Down Expand Up @@ -78,6 +78,7 @@ rand = "0.8.0"

[features]
default = [
"metrics",
"flate2/zlib",
"mysql_common/bigdecimal",
"mysql_common/rust_decimal",
Expand All @@ -95,6 +96,7 @@ default-rustls = [
"derive",
"rustls-tls",
]
metrics = []
minimal = ["flate2/zlib"]
native-tls-tls = ["native-tls", "tokio-native-tls"]
rustls-tls = [
Expand Down
27 changes: 22 additions & 5 deletions src/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// option. All files in the project carrying such notice may not be copied,
// modified, or distributed except according to those terms.

use crate::metrics::BufferPoolMetrics;
use crossbeam::queue::ArrayQueue;
use std::{mem::replace, ops::Deref, sync::Arc};

Expand All @@ -14,6 +15,7 @@ pub struct BufferPool {
buffer_size_cap: usize,
buffer_init_cap: usize,
pool: ArrayQueue<Vec<u8>>,
metrics: BufferPoolMetrics,
}

impl BufferPool {
Expand All @@ -37,14 +39,21 @@ impl BufferPool {
pool: ArrayQueue::new(pool_cap),
buffer_size_cap,
buffer_init_cap,
metrics: Default::default(),
}
}

pub fn get(self: &Arc<Self>) -> PooledBuf {
let buf = self
.pool
.pop()
.unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap));
let buf = match self.pool.pop() {
Some(buf) => {
self.metrics.reuses.incr();
buf
}
None => {
self.metrics.creations.incr();
Vec::with_capacity(self.buffer_init_cap)
}
};
debug_assert_eq!(buf.len(), 0);
PooledBuf(buf, self.clone())
}
Expand All @@ -64,7 +73,15 @@ impl BufferPool {
buf.shrink_to(self.buffer_size_cap);

// ArrayQueue will make sure to drop the buffer if capacity is exceeded
let _ = self.pool.push(buf);
match self.pool.push(buf) {
Ok(()) => self.metrics.returns.incr(),
Err(_buf) => self.metrics.discards.incr(),
};
}

#[cfg(feature = "metrics")]
pub(crate) fn snapshot_metrics(&self) -> BufferPoolMetrics {
self.metrics.clone()
}
}

Expand Down
30 changes: 29 additions & 1 deletion src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
consts::{CapabilityFlags, Command, StatusFlags},
error::*,
io::Stream,
metrics::ConnMetrics,
opts::Opts,
queryable::{
query_result::{QueryResult, ResultSetMeta},
Expand All @@ -59,6 +60,8 @@ const DEFAULT_WAIT_TIMEOUT: usize = 28800;

/// Helper that asynchronously disconnects the givent connection on the default tokio executor.
fn disconnect(mut conn: Conn) {
conn.metrics().disconnects.incr();

let disconnected = conn.inner.disconnected;

// Mark conn as disconnected.
Expand Down Expand Up @@ -119,6 +122,7 @@ struct ConnInner {
/// One-time connection-level infile handler.
infile_handler:
Option<Pin<Box<dyn Future<Output = crate::Result<InfileData>> + Send + Sync + 'static>>>,
conn_metrics: Arc<ConnMetrics>,
}

impl fmt::Debug for ConnInner {
Expand All @@ -140,6 +144,7 @@ impl fmt::Debug for ConnInner {
impl ConnInner {
/// Constructs an empty connection.
fn empty(opts: Opts) -> ConnInner {
let conn_metrics: Arc<ConnMetrics> = Default::default();
ConnInner {
capabilities: opts.get_capabilities(),
status: StatusFlags::empty(),
Expand All @@ -154,7 +159,7 @@ impl ConnInner {
tx_status: TxStatus::None,
last_io: Instant::now(),
wait_timeout: Duration::from_secs(0),
stmt_cache: StmtCache::new(opts.stmt_cache_size()),
stmt_cache: StmtCache::new(opts.stmt_cache_size(), conn_metrics.clone()),
socket: opts.socket().map(Into::into),
opts,
nonce: Vec::default(),
Expand All @@ -164,6 +169,7 @@ impl ConnInner {
server_key: None,
infile_handler: None,
reset_upon_returning_to_a_pool: false,
conn_metrics,
}
}

Expand All @@ -175,6 +181,18 @@ impl ConnInner {
.as_mut()
.ok_or_else(|| DriverError::ConnectionClosed.into())
}

fn set_pool(&mut self, pool: Option<Pool>) {
let conn_metrics = if let Some(ref pool) = pool {
Arc::clone(&pool.inner.metrics.conn)
} else {
Default::default()
};
self.conn_metrics = Arc::clone(&conn_metrics);
self.stmt_cache.conn_metrics = conn_metrics;

self.pool = pool;
}
}

/// MySql server connection.
Expand Down Expand Up @@ -926,6 +944,8 @@ impl Conn {
conn.run_init_commands().await?;
conn.run_setup_commands().await?;

conn.metrics().connects.incr();

Ok(conn)
}
.boxed()
Expand Down Expand Up @@ -1162,6 +1182,10 @@ impl Conn {
self.inner.stmt_cache.clear();
self.inner.infile_handler = None;
self.run_setup_commands().await?;
// self.inner.set_pool(pool);

// TODO: clear some metrics?

Ok(())
}

Expand Down Expand Up @@ -1276,6 +1300,10 @@ impl Conn {

Ok(BinlogStream::new(self))
}

pub(crate) fn metrics(&self) -> &ConnMetrics {
&self.inner.conn_metrics
}
}

#[cfg(test)]
Expand Down
6 changes: 4 additions & 2 deletions src/conn/pool/futures/get_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ impl Future for GetConn {

return match result {
Ok(mut c) => {
c.inner.pool = Some(pool);
c.inner.set_pool(Some(pool));
c.inner.reset_upon_returning_to_a_pool =
self.reset_upon_returning_to_a_pool;
c.metrics().connects.incr();
Poll::Ready(Ok(c))
}
Err(e) => {
Expand All @@ -160,7 +161,8 @@ impl Future for GetConn {
self.inner = GetConnInner::Done;

let pool = self.pool_take();
c.inner.pool = Some(pool);
pool.inner.metrics.reuses.incr();
c.inner.set_pool(Some(pool));
c.inner.reset_upon_returning_to_a_pool =
self.reset_upon_returning_to_a_pool;
return Poll::Ready(Ok(c));
Expand Down
30 changes: 29 additions & 1 deletion src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
use crate::{
conn::{pool::futures::*, Conn},
error::*,
metrics::PoolMetrics,
opts::{Opts, PoolOpts},
queryable::transaction::{Transaction, TxOpts},
};
Expand Down Expand Up @@ -121,6 +122,10 @@ impl Waitlist {
self.queue.remove(&tmp);
}

fn len(&self) -> usize {
self.queue.len()
}

fn is_empty(&self) -> bool {
self.queue.is_empty()
}
Expand Down Expand Up @@ -177,6 +182,7 @@ pub struct Inner {
close: atomic::AtomicBool,
closed: atomic::AtomicBool,
exchange: Mutex<Exchange>,
pub(crate) metrics: PoolMetrics,
}

/// Asynchronous pool of MySql connections.
Expand All @@ -190,7 +196,7 @@ pub struct Inner {
#[derive(Debug, Clone)]
pub struct Pool {
opts: Opts,
inner: Arc<Inner>,
pub(super) inner: Arc<Inner>,
drop: mpsc::UnboundedSender<Option<Conn>>,
}

Expand Down Expand Up @@ -219,6 +225,7 @@ impl Pool {
exist: 0,
recycler: Some((rx, pool_opts)),
}),
metrics: Default::default(),
}),
drop: tx,
}
Expand All @@ -232,6 +239,7 @@ impl Pool {

/// Async function that resolves to `Conn`.
pub fn get_conn(&self) -> GetConn {
self.inner.metrics.gets.incr();
let reset_connection = self.opts.pool_opts().reset_connection();
GetConn::new(self, reset_connection)
}
Expand All @@ -250,6 +258,11 @@ impl Pool {
DisconnectPool::new(self)
}

#[cfg(feature = "metrics")]
pub fn snapshot_metrics(&self) -> PoolMetrics {
self.inner.metrics.clone()
}

/// A way to return connection taken from a pool.
fn return_conn(&mut self, conn: Conn) {
// NOTE: we're not in async context here, so we can't block or return NotReady
Expand All @@ -258,6 +271,8 @@ impl Pool {
}

fn send_to_recycler(&self, conn: Conn) {
self.inner.metrics.recycler.recycles.incr();

if let Err(conn) = self.drop.send(Some(conn)) {
let conn = conn.0.unwrap();

Expand Down Expand Up @@ -354,6 +369,19 @@ impl Pool {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.waiting.remove(queue_id);
}

/// Returns the number of
/// - open connections,
/// - idling connections in the pool and
/// - tasks waiting for a connection.
pub fn queue_stats(&self) -> (usize, usize, usize) {
let exchange = self.inner.exchange.lock().unwrap();
(
exchange.exist,
exchange.available.len(),
exchange.waiting.len(),
)
}
}

impl Drop for Conn {
Expand Down
5 changes: 5 additions & 0 deletions src/conn/pool/recycler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ impl Future for Recycler {
let mut exchange = $self.inner.exchange.lock().unwrap();
if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() {
drop(exchange);
$self.inner.metrics.recycler.discards.incr();
$self.discard.push($conn.close_conn().boxed());
} else {
$self.inner.metrics.recycler.recycled_returnals.incr();
exchange.available.push_back($conn.into());
if let Some(w) = exchange.waiting.pop() {
w.wake();
Expand All @@ -80,11 +82,14 @@ impl Future for Recycler {
macro_rules! conn_decision {
($self:ident, $conn:ident) => {
if $conn.inner.stream.is_none() || $conn.inner.disconnected {
$self.inner.metrics.recycler.discards.incr();
// drop unestablished connection
$self.discard.push(futures_util::future::ok(()).boxed());
} else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() {
$self.inner.metrics.recycler.cleans.incr();
$self.cleaning.push($conn.cleanup_for_pool().boxed());
} else if $conn.expired() || close {
$self.inner.metrics.recycler.discards.incr();
$self.discard.push($conn.close_conn().boxed());
} else if $conn.inner.reset_upon_returning_to_a_pool {
$self.reset.push($conn.reset_for_pool().boxed());
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/change_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct ChangeUser;

impl Routine<()> for ChangeUser {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.change_user.incr();

#[cfg(feature = "tracing")]
let span = debug_span!(
"mysql_async::change_user",
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ impl<'a> ExecRoutine<'a> {

impl Routine<()> for ExecRoutine<'_> {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.execs.incr();

#[cfg(feature = "tracing")]
let span = info_span!(
"mysql_async::exec",
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/next_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ where
P: Protocol,
{
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.next_sets.incr();

#[cfg(feature = "tracing")]
let span = debug_span!(
"mysql_async::next_set",
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct PingRoutine;

impl Routine<()> for PingRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.pings.incr();

#[cfg(feature = "tracing")]
let span = debug_span!("mysql_async::ping", mysql_async.connection.id = conn.id());

Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ impl PrepareRoutine {

impl Routine<Arc<StmtInner>> for PrepareRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<Arc<StmtInner>>> {
conn.metrics().routines.prepares.incr();

#[cfg(feature = "tracing")]
let span = info_span!(
"mysql_async::prepare",
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ impl<'a, L: TracingLevel> QueryRoutine<'a, L> {

impl<L: TracingLevel> Routine<()> for QueryRoutine<'_, L> {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.queries.incr();

#[cfg(feature = "tracing")]
let span = create_span!(
L::LEVEL,
Expand Down
2 changes: 2 additions & 0 deletions src/conn/routines/reset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct ResetRoutine;

impl Routine<()> for ResetRoutine {
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
conn.metrics().routines.resets.incr();

#[cfg(feature = "tracing")]
let span = debug_span!("mysql_async::reset", mysql_async.connection.id = conn.id());

Expand Down
Loading

0 comments on commit 2d95f84

Please sign in to comment.