Skip to content

Commit

Permalink
Merge pull request #20 from joyent/#9
Browse files Browse the repository at this point in the history
Cueball should terminate decoherence check on pool stop
  • Loading branch information
kellymclaughlin authored Dec 18, 2019
2 parents b6abbaa + 5cf8732 commit 1a4b9c2
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ edition = "2018"

[dependencies]
base64 = "0.10.1"
chrono = "0.4.7"
periodic = "0.1.1"
chrono = "0.4.9"
derive_more = "0.14.0"
rand = "0.7.0"
sha1 = "0.6.0"
slog = "2"
slog-stdlog = "3"
timer = "0.2.0"

[dev-dependencies]
slog-term = "2.4.0"
90 changes: 77 additions & 13 deletions src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ pub mod types;

use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::Result as FmtResult;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Barrier};
use std::time::Duration;
use std::{thread, time};

use chrono::Duration;
use slog::{debug, error, info, o, warn, Drain, Logger};
use timer::Guard;

use crate::backend::{Backend, BackendKey};
use crate::connection::Connection;
Expand All @@ -35,7 +38,6 @@ const DEFAULT_REBALANCE_ACTION_DELAY: u64 = 100;
const DEFAULT_DECOHERENCE_INTERVAL: u64 = 300;

/// A pool of connections to a multi-node service
#[derive(Debug)]
pub struct ConnectionPool<C, R, F> {
protected_data: ProtectedData<C>,
resolver_thread: Option<thread::JoinHandle<()>>,
Expand All @@ -49,10 +51,64 @@ pub struct ConnectionPool<C, R, F> {
decoherence_interval: Option<u64>,
log: Logger,
state: ConnectionPoolState,
decoherence_timer: Option<timer::Timer>,
decoherence_timer_guard: Guard,
_resolver: PhantomData<R>,
_connection_function: PhantomData<F>,
}

impl<C: Debug, R: Debug, F: Debug> Debug for ConnectionPool<C, R, F> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
match *self {
ConnectionPool {
ref protected_data,
ref resolver_thread,
ref resolver_rx_thread,
ref resolver_tx,
ref max_connections,
ref claim_timeout,
ref rebalance_check,
ref rebalance_thread,
ref rebalancer_stop,
ref decoherence_interval,
ref log,
ref state,
ref _resolver,
ref _connection_function,
..
} => {
let mut debug_trait_builder = f.debug_struct("ConnectionPool");
let _ = debug_trait_builder
.field("protected_data", &&(protected_data));
let _ = debug_trait_builder
.field("resolver_thread", &&(resolver_thread));
let _ = debug_trait_builder
.field("resolver_rx_thread", &&(resolver_rx_thread));
let _ =
debug_trait_builder.field("resolver_tx", &&(resolver_tx));
let _ = debug_trait_builder
.field("max_connections", &&(max_connections));
let _ = debug_trait_builder
.field("claim_timeout", &&(claim_timeout));
let _ = debug_trait_builder
.field("rebalance_check", &&(rebalance_check));
let _ = debug_trait_builder
.field("rebalance_thread", &&(rebalance_thread));
let _ = debug_trait_builder
.field("rebalancer_stop", &&(rebalancer_stop));
let _ = debug_trait_builder
.field("decoherence_interval", &&(decoherence_interval));
let _ = debug_trait_builder.field("log", &&(log));
let _ = debug_trait_builder.field("state", &&(state));
let _ = debug_trait_builder.field("_resolver", &&(_resolver));
let _ = debug_trait_builder
.field("_connection_function", &&(_connection_function));
debug_trait_builder.finish()
}
}
}
}

impl<C, R, F> Clone for ConnectionPool<C, R, F>
where
C: Connection,
Expand All @@ -73,6 +129,8 @@ where
decoherence_interval: self.decoherence_interval,
log: self.log.clone(),
state: self.state,
decoherence_timer: None,
decoherence_timer_guard: self.decoherence_timer_guard.clone(),
_resolver: PhantomData,
_connection_function: PhantomData,
}
Expand Down Expand Up @@ -162,7 +220,10 @@ where
.decoherence_interval
.unwrap_or(DEFAULT_DECOHERENCE_INTERVAL);

start_decoherence(
let timer = timer::Timer::new();

let decoherence_timer_guard = start_decoherence(
&timer,
decoherence_interval,
protected_data.clone(),
logger.clone(),
Expand All @@ -181,6 +242,8 @@ where
decoherence_interval: Some(decoherence_interval),
log: logger,
state: ConnectionPoolState::Running,
decoherence_timer: Some(timer),
decoherence_timer_guard,
_resolver: PhantomData,
_connection_function: PhantomData,
};
Expand Down Expand Up @@ -269,6 +332,10 @@ where

drop(connection_data);

if self.decoherence_timer.is_some() {
let _timer = self.decoherence_timer.take();
}

// Wait for all outstanding threads to be returned to the pool and
// close those
while connections_remaining > 0.into() {
Expand Down Expand Up @@ -885,9 +952,7 @@ fn resolver_recv_loop<C>(
done = true;
None
}
Ok(BackendMsg::HeartbeatMsg) => {
None
}
Ok(BackendMsg::HeartbeatMsg) => None,
Err(_recv_err) => {
done = true;
None
Expand Down Expand Up @@ -963,23 +1028,22 @@ fn rebalancer_loop<C, F>(

/// Start a thread to run periodic decoherence on the connection pool
fn start_decoherence<C>(
timer: &timer::Timer,
decoherence_interval: u64,
protected_data: ProtectedData<C>,
log: Logger,
) where
) -> Guard
where
C: Connection,
{
debug!(
log,
"starting decoherence task, interval {} seconds", decoherence_interval
);

let mut planner = periodic::Planner::new();
planner.add(
timer.schedule_repeating(
Duration::seconds(decoherence_interval as i64),
move || reshuffle_connection_queue(protected_data.clone(), log.clone()),
periodic::Every::new(Duration::from_secs(decoherence_interval)),
);
planner.start();
)
}

fn reshuffle_connection_queue<C>(protected_data: ProtectedData<C>, log: Logger)
Expand Down

0 comments on commit 1a4b9c2

Please sign in to comment.