From e551ed2a0c944df3e8bd517669431f4bdf532a92 Mon Sep 17 00:00:00 2001 From: Kelly McLaughlin Date: Tue, 17 Dec 2019 14:49:12 -0700 Subject: [PATCH 1/2] Cueball should terminate decoherence check on pool stop Switch from using the periodic crate to the timer crate to manage the decoherence work. This change makes it so that when the connection pool is explicitly stopped or the connection pool goes out of scope then the threads managing the periodic decoherence work also are terminated. --- Cargo.toml | 4 +- src/connection_pool.rs | 92 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 81 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5f11daa..15f5f8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,13 +10,13 @@ edition = "2018" [dependencies] base64 = "0.10.1" -chrono = "0.4.7" -periodic = "0.1.1" +chrono = "0.4.9" derive_more = "0.14.0" rand = "0.7.0" sha1 = "0.6.0" slog = "2" slog-stdlog = "3" +timer = "0.2.0" [dev-dependencies] slog-term = "2.4.0" diff --git a/src/connection_pool.rs b/src/connection_pool.rs index 6f6ff9e..fa55447 100644 --- a/src/connection_pool.rs +++ b/src/connection_pool.rs @@ -4,16 +4,19 @@ pub mod types; use std::cmp::Ordering; use std::collections::HashMap; +use std::fmt::Result as FmtResult; +use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Barrier}; -use std::time::Duration; use std::{thread, time}; +use chrono::Duration; use slog::{debug, error, info, o, warn, Drain, Logger}; +use timer::Guard; use crate::backend::{Backend, BackendKey}; use crate::connection::Connection; @@ -35,7 +38,6 @@ const DEFAULT_REBALANCE_ACTION_DELAY: u64 = 100; const DEFAULT_DECOHERENCE_INTERVAL: u64 = 300; /// A pool of connections to a multi-node service -#[derive(Debug)] pub struct ConnectionPool { protected_data: ProtectedData, resolver_thread: Option>, @@ -49,10 +51,65 @@ pub struct ConnectionPool { decoherence_interval: Option, log: Logger, state: ConnectionPoolState, + decoherence_timer: Option, + decoherence_timer_guard: Guard, _resolver: PhantomData, _connection_function: PhantomData, } +impl Debug for ConnectionPool { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match *self { + ConnectionPool { + ref protected_data, + ref resolver_thread, + ref resolver_rx_thread, + ref resolver_tx, + ref max_connections, + ref claim_timeout, + ref rebalance_check, + ref rebalance_thread, + ref rebalancer_stop, + ref decoherence_interval, + ref log, + ref state, + decoherence_timer_guard: _, + decoherence_timer: _, + ref _resolver, + ref _connection_function, + } => { + let mut debug_trait_builder = f.debug_struct("ConnectionPool"); + let _ = debug_trait_builder + .field("protected_data", &&(protected_data)); + let _ = debug_trait_builder + .field("resolver_thread", &&(resolver_thread)); + let _ = debug_trait_builder + .field("resolver_rx_thread", &&(resolver_rx_thread)); + let _ = + debug_trait_builder.field("resolver_tx", &&(resolver_tx)); + let _ = debug_trait_builder + .field("max_connections", &&(max_connections)); + let _ = debug_trait_builder + .field("claim_timeout", &&(claim_timeout)); + let _ = debug_trait_builder + .field("rebalance_check", &&(rebalance_check)); + let _ = debug_trait_builder + .field("rebalance_thread", &&(rebalance_thread)); + let _ = debug_trait_builder + .field("rebalancer_stop", &&(rebalancer_stop)); + let _ = debug_trait_builder + .field("decoherence_interval", &&(decoherence_interval)); + let _ = debug_trait_builder.field("log", &&(log)); + let _ = debug_trait_builder.field("state", &&(state)); + let _ = debug_trait_builder.field("_resolver", &&(_resolver)); + let _ = debug_trait_builder + .field("_connection_function", &&(_connection_function)); + debug_trait_builder.finish() + } + } + } +} + impl Clone for ConnectionPool where C: Connection, @@ -73,6 +130,8 @@ where decoherence_interval: self.decoherence_interval, log: self.log.clone(), state: self.state, + decoherence_timer: None, + decoherence_timer_guard: self.decoherence_timer_guard.clone(), _resolver: PhantomData, _connection_function: PhantomData, } @@ -162,7 +221,10 @@ where .decoherence_interval .unwrap_or(DEFAULT_DECOHERENCE_INTERVAL); - start_decoherence( + let timer = timer::Timer::new(); + + let decoherence_timer_guard = start_decoherence( + &timer, decoherence_interval, protected_data.clone(), logger.clone(), @@ -181,6 +243,8 @@ where decoherence_interval: Some(decoherence_interval), log: logger, state: ConnectionPoolState::Running, + decoherence_timer: Some(timer), + decoherence_timer_guard, _resolver: PhantomData, _connection_function: PhantomData, }; @@ -269,6 +333,11 @@ where drop(connection_data); + if self.decoherence_timer.is_some() { + let _timer = self.decoherence_timer.take(); + } + drop(&self.decoherence_timer_guard); + // Wait for all outstanding threads to be returned to the pool and // close those while connections_remaining > 0.into() { @@ -885,9 +954,7 @@ fn resolver_recv_loop( done = true; None } - Ok(BackendMsg::HeartbeatMsg) => { - None - } + Ok(BackendMsg::HeartbeatMsg) => None, Err(_recv_err) => { done = true; None @@ -963,23 +1030,22 @@ fn rebalancer_loop( /// Start a thread to run periodic decoherence on the connection pool fn start_decoherence( + timer: &timer::Timer, decoherence_interval: u64, protected_data: ProtectedData, log: Logger, -) where +) -> Guard +where C: Connection, { debug!( log, "starting decoherence task, interval {} seconds", decoherence_interval ); - - let mut planner = periodic::Planner::new(); - planner.add( + timer.schedule_repeating( + Duration::seconds(decoherence_interval as i64), move || reshuffle_connection_queue(protected_data.clone(), log.clone()), - periodic::Every::new(Duration::from_secs(decoherence_interval)), - ); - planner.start(); + ) } fn reshuffle_connection_queue(protected_data: ProtectedData, log: Logger) From 5cf8732b00a2a9b7b3eda5d335023d8a3b1695f2 Mon Sep 17 00:00:00 2001 From: Kelly McLaughlin Date: Tue, 17 Dec 2019 15:13:11 -0700 Subject: [PATCH 2/2] Address clippy complaints --- src/connection_pool.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/connection_pool.rs b/src/connection_pool.rs index fa55447..feb59d7 100644 --- a/src/connection_pool.rs +++ b/src/connection_pool.rs @@ -73,10 +73,9 @@ impl Debug for ConnectionPool { ref decoherence_interval, ref log, ref state, - decoherence_timer_guard: _, - decoherence_timer: _, ref _resolver, ref _connection_function, + .. } => { let mut debug_trait_builder = f.debug_struct("ConnectionPool"); let _ = debug_trait_builder @@ -336,7 +335,6 @@ where if self.decoherence_timer.is_some() { let _timer = self.decoherence_timer.take(); } - drop(&self.decoherence_timer_guard); // Wait for all outstanding threads to be returned to the pool and // close those