Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MANTA-4627 https://jira.joyent.us/browse/MANTA-4627 #12

Merged
merged 2 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
[package]
name = "cueball"
version = "0.1.0"
authors = ["Kelly McLaughlin <[email protected]>", "Jon Anderson <[email protected]>"]
version = "0.2.0"
authors = [
"Kelly McLaughlin <[email protected]>",
"Jon Anderson <[email protected]>",
"Isaac Davis <[email protected]>"
]
edition = "2018"

[dependencies]
Expand Down
46 changes: 27 additions & 19 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Barrier, Mutex};
use std::{thread, time};
use std::time::Duration;

use slog::{info, o, Drain, Logger};

Expand All @@ -15,6 +16,8 @@ use cueball::connection_pool::ConnectionPool;
use cueball::error::Error;
use cueball::resolver::{BackendAddedMsg, BackendMsg, Resolver};

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

#[derive(Debug)]
pub struct DummyConnection {
addr: SocketAddr,
Expand Down Expand Up @@ -49,39 +52,44 @@ impl Connection for DummyConnection {
pub struct FakeResolver {
backends: Vec<(BackendAddress, BackendPort)>,
pool_tx: Option<Sender<BackendMsg>>,
started: bool,
running: bool,
}

impl FakeResolver {
pub fn new(backends: Vec<(BackendAddress, BackendPort)>) -> Self {
FakeResolver {
backends: backends,
pool_tx: None,
started: false,
running: false,
}
}
}

impl Resolver for FakeResolver {
fn start(&mut self, s: Sender<BackendMsg>) {
if !self.started {
self.backends.iter().for_each(|b| {
let backend = Backend::new(&b.0, b.1);
let backend_key = backend::srv_key(&backend);
let backend_msg = BackendMsg::AddedMsg(BackendAddedMsg {
key: backend_key,
backend: backend,
});
s.send(backend_msg).unwrap();
fn run(&mut self, s: Sender<BackendMsg>) {
if self.running {
return;
}
self.running = true;
self.backends.iter().for_each(|b| {
let backend = Backend::new(&b.0, b.1);
let backend_key = backend::srv_key(&backend);
let backend_msg = BackendMsg::AddedMsg(BackendAddedMsg {
key: backend_key,
backend: backend,
});
self.pool_tx = Some(s);
self.started = true;
s.send(backend_msg).unwrap();
});
self.pool_tx = Some(s);

loop {
if self.pool_tx.as_ref().unwrap().send(BackendMsg::HeartbeatMsg).
is_err() {
break;
}
thread::sleep(HEARTBEAT_INTERVAL);
}
}

fn stop(&mut self) {
self.started = false;
()
self.running = false;
}
}

Expand Down
15 changes: 10 additions & 5 deletions src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ where
// Wait until ConnectonPool is created
barrier_clone.wait();

resolver.start(resolver_tx_clone);
resolver.run(resolver_tx_clone);
});

let protected_data = ProtectedData::new(connection_data);
Expand Down Expand Up @@ -209,10 +209,12 @@ where

// Notify the resolver, resolver_recv, and rebalancer threads to
// shutdown. Join on the thread handles for the resolver receiver
// and rebalancer threads. Do not join on the resolver thread
// because the code is not controlled by the pool and may not
// properly respond to the resolver receiver thread stopping. Just
// drop the JoinHandle for the resolver thread and move along.
// and rebalancer threads. If the resolver is well-behaved, dropping
// all clones of the receiver channel should cause the resolver to
// shut down. Do not join on the resolver thread because the code is
// not controlled by the pool and may not properly respond to the
// resolver receiver thread stopping. Just drop the JoinHandle for
// the resolver thread and move along.
self.rebalancer_stop.store(true, AtomicOrdering::Relaxed);
let resolver_tx = self.resolver_tx.take().unwrap();
match resolver_tx.send(BackendMsg::StopMsg) {
Expand Down Expand Up @@ -883,6 +885,9 @@ fn resolver_recv_loop<C>(
done = true;
None
}
Ok(BackendMsg::HeartbeatMsg) => {
None
}
Err(_recv_err) => {
done = true;
None
Expand Down
16 changes: 10 additions & 6 deletions src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ use crate::backend;
pub trait Resolver: Send + 'static {
/// Start the operation of the resolver. Begin querying for backends and
/// notifying the connection pool using the provided [`Sender`](https://doc.rust-lang.org/std/sync/mpsc/struct.Sender.html).
fn start(&mut self, s: Sender<BackendMsg>);
/// Shutdown the resolver. Cease querying for new backends. In the event
/// that attempting to send a message on the [`Sender`](https://doc.rust-lang.org/std/sync/mpsc/struct.Sender.html) channel provided in
/// `start` fails with an error then this method should be
/// called as it indicates the connection pool is shutting down.
fn stop(&mut self);
///
/// This function is expected to block while the resolver is running, and
/// return if the receiving end of the channel is closed, or if the Resolver
/// encounters an unrecoverable error of any sort. Thus, callers can shut
/// down the resolver by closing the receiving end of the channel.
fn run(&mut self, s: Sender<BackendMsg>);
}

/// Represents the message that should be sent to the connection pool when a new
Expand All @@ -46,6 +46,10 @@ pub enum BackendMsg {
// For internal pool use only
#[doc(hidden)]
StopMsg,
// For internal pool use only. Resolver implementations can send this
// message to test whether or not the channel has been closed.
#[doc(hidden)]
HeartbeatMsg
}

/// Returned from the functions used by the connection pool to add or remove
Expand Down
46 changes: 27 additions & 19 deletions tests/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Barrier};
use std::{thread, time};
use std::time::Duration;

use cueball::backend;
use cueball::backend::{Backend, BackendAddress, BackendPort};
Expand All @@ -13,6 +14,8 @@ use cueball::connection_pool::ConnectionPool;
use cueball::error::Error;
use cueball::resolver::{BackendAddedMsg, BackendMsg, Resolver};

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

#[derive(Debug)]
pub struct DummyConnection {
addr: SocketAddr,
Expand Down Expand Up @@ -47,39 +50,44 @@ impl Connection for DummyConnection {
pub struct FakeResolver {
backends: Vec<(BackendAddress, BackendPort)>,
pool_tx: Option<Sender<BackendMsg>>,
started: bool,
running: bool,
}

impl FakeResolver {
pub fn new(backends: Vec<(BackendAddress, BackendPort)>) -> Self {
FakeResolver {
backends: backends,
pool_tx: None,
started: false,
running: false,
}
}
}

impl Resolver for FakeResolver {
fn start(&mut self, s: Sender<BackendMsg>) {
if !self.started {
self.backends.iter().for_each(|b| {
let backend = Backend::new(&b.0, b.1);
let backend_key = backend::srv_key(&backend);
let backend_msg = BackendMsg::AddedMsg(BackendAddedMsg {
key: backend_key,
backend: backend,
});
s.send(backend_msg).unwrap();
fn run(&mut self, s: Sender<BackendMsg>) {
if self.running {
return;
}
self.running = true;
self.backends.iter().for_each(|b| {
let backend = Backend::new(&b.0, b.1);
let backend_key = backend::srv_key(&backend);
let backend_msg = BackendMsg::AddedMsg(BackendAddedMsg {
key: backend_key,
backend: backend,
});
self.pool_tx = Some(s);
self.started = true;
s.send(backend_msg).unwrap();
});
self.pool_tx = Some(s);

loop {
if self.pool_tx.as_ref().unwrap().send(BackendMsg::HeartbeatMsg).
is_err() {
break;
}
thread::sleep(HEARTBEAT_INTERVAL);
}
}

fn stop(&mut self) {
self.started = false;
()
self.running = false;
}
}

Expand Down