Skip to content

Commit

Permalink
Merge pull request #99 from tkrs/wait-for-flush
Browse files Browse the repository at this point in the history
feat: wait for the flash to finish
  • Loading branch information
tkrs authored May 9, 2020
2 parents ad74e15 + 0a6e344 commit c0f16ba
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 55 deletions.
13 changes: 4 additions & 9 deletions examples/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use poston::{Client, Settings, WorkerPool};
use pretty_env_logger;
use rand::prelude::*;
use rand::{self, distributions::Alphanumeric};
use std::sync::Mutex;
use std::thread;
use std::time::{Duration, Instant, SystemTime};

Expand All @@ -20,7 +19,7 @@ struct Human {
}

lazy_static! {
static ref POOL: Mutex<WorkerPool> = {
static ref POOL: WorkerPool = {
let addr = "127.0.0.1:24224".to_string();
let settins = Settings {
flush_period: Duration::from_millis(10),
Expand All @@ -30,9 +29,7 @@ lazy_static! {
read_timeout: Duration::from_secs(30),
..Default::default()
};
let pool =
WorkerPool::with_settings(&addr, &settins).expect("Couldn't create the worker pool.");
Mutex::new(pool)
WorkerPool::with_settings(&addr, &settins).expect("Couldn't create the worker pool.")
};
}

Expand All @@ -57,8 +54,7 @@ fn main() {
let a = Human { age, name };
let timestamp = SystemTime::now();

let pool = POOL.lock().expect("Client couldn't be locked.");
pool.send(tag, &a, timestamp).unwrap();
POOL.send(tag, &a, timestamp).unwrap();

let dur = rng.gen_range(10, 500000);
thread::sleep(Duration::new(0, dur));
Expand All @@ -73,8 +69,7 @@ fn main() {

info!("End sending messages.");

let mut pool = POOL.lock().expect("Client couldn't be locked.");
pool.close();
POOL.terminate().unwrap();

info!("End. elapsed: {:?}", start.elapsed());
}
44 changes: 21 additions & 23 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::connect;
use crate::error::Error;
use crate::rmps::encode as rencode;
use crate::worker::{Message, Worker};
use crossbeam_channel::{unbounded, Sender};
use crossbeam_channel::{bounded, unbounded, Sender};
use serde::Serialize;
use std::fmt::Debug;
use std::io;
Expand All @@ -14,14 +14,13 @@ pub trait Client {
fn send<A>(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), Error>
where
A: Serialize;
fn send_flush(&self) -> Result<(), Error>;
fn close(&mut self);
fn terminate(&self) -> Result<(), Error>;
}

pub struct WorkerPool {
worker: Worker,
sender: Sender<Message>,
closed: AtomicBool,
terminated: AtomicBool,
}

impl WorkerPool {
Expand Down Expand Up @@ -63,7 +62,7 @@ impl WorkerPool {
Ok(WorkerPool {
worker,
sender,
closed: AtomicBool::new(false),
terminated: AtomicBool::new(false),
})
}
}
Expand All @@ -73,7 +72,7 @@ impl Client for WorkerPool {
where
A: Serialize,
{
if self.closed.load(Ordering::Acquire) {
if self.terminated.load(Ordering::Acquire) {
debug!("Worker does already closed.");
return Ok(());
}
Expand All @@ -87,33 +86,32 @@ impl Client for WorkerPool {
Ok(())
}

fn send_flush(&self) -> Result<(), Error> {
self.sender
.send(Message::Flushing)
.map_err(|e| Error::SendError(e.to_string()))?;
Ok(())
}

fn close(&mut self) {
if self.closed.fetch_or(true, Ordering::SeqCst) {
info!("Worker does already closed.");
return;
fn terminate(&self) -> Result<(), Error> {
if self.terminated.fetch_or(true, Ordering::SeqCst) {
info!("Worker does already terminated.");
return Ok(());
}

info!("Sending terminate message to worker.");

self.sender.send(Message::Terminate).unwrap();

info!("Shutting down worker.");
let (sender, receiver) = bounded::<()>(0);
self.sender.send(Message::Terminating(sender)).unwrap();
receiver
.recv()
.map_err(|e| Error::TerminateError(e.to_string()))?;

let wkr = &mut self.worker;
wkr.join_handler();
Ok(())
}
}

impl Drop for WorkerPool {
fn drop(&mut self) {
self.close()
self.terminate().unwrap();
let wkr = &mut self.worker;

info!("Shutting down worker.");

wkr.join_handler();
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ impl Emitter {
Emitter { tag, queue }
}

pub fn len(&self) -> usize {
self.queue.borrow().len()
}

pub fn push(&self, elem: (SystemTime, Vec<u8>)) {
let mut q = self.queue.borrow_mut();
q.push_back(elem)
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub enum Error {
NetworkError(String),
DeriveError(String),
SendError(String),
TerminateError(String),
AckUmatchedError(String, String),
EmittingTimeoutError,
ConnectingTimeoutError,
Expand All @@ -18,6 +19,7 @@ impl StdError for Error {
Error::NetworkError(ref e) => e,
Error::DeriveError(ref e) => e,
Error::SendError(ref e) => e,
Error::TerminateError(ref e) => e,
Error::AckUmatchedError(_, _) => "request chunk and response ack-id did not match",
Error::EmittingTimeoutError => "emitting timeout",
Error::ConnectingTimeoutError => "connecting timeout",
Expand Down
5 changes: 5 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::SystemTime;
pub trait Queue {
fn push(&mut self, tag: String, tm: SystemTime, msg: Vec<u8>);
fn flush(&mut self, size: Option<usize>);
fn len(&self) -> usize;
}

pub struct QueueHandler<S: WriteRead> {
Expand All @@ -32,6 +33,10 @@ impl<S: WriteRead> Queue for QueueHandler<S> {
}
}
}

fn len(&self) -> usize {
self.emitters.values().map(|e| e.len()).sum()
}
}

#[derive(PartialEq, Debug)]
Expand Down
41 changes: 18 additions & 23 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::connect::*;
use crate::queue::*;
use crossbeam_channel::Receiver;
use crossbeam_channel::{Receiver, Sender};
use std::collections::HashMap;
use std::fmt::Debug;
use std::io;
Expand Down Expand Up @@ -48,8 +48,7 @@ impl Worker {

pub enum Message {
Queuing(String, SystemTime, Vec<u8>),
Flushing,
Terminate,
Terminating(Sender<()>),
}

fn start_worker<S: WriteRead>(
Expand Down Expand Up @@ -106,14 +105,11 @@ fn handle_message(
HandleResult::Queued
}
}
Message::Flushing => {
info!("Received a flushing message");
queue.flush(None);
HandleResult::Flushed
}
Message::Terminate => {
info!("Received a terminate message");
Message::Terminating(sender) => {
info!("Received a terminating message");
queue.flush(None);
info!("Flushed, queue remaining: {}", queue.len());
sender.send(()).unwrap();
HandleResult::Terminated
}
}
Expand All @@ -123,7 +119,7 @@ fn handle_message(
mod tests {
use super::*;
use crate::connect::ConnectionSettings;
use crossbeam_channel::unbounded;
use crossbeam_channel::{bounded, unbounded};

#[test]
fn worker_create_should_return_err_when_the_connection_open_is_failed() {
Expand All @@ -149,6 +145,9 @@ mod tests {
impl Queue for Q {
fn push(&mut self, _tag: String, _tm: SystemTime, _msg: Vec<u8>) {}
fn flush(&mut self, _size: Option<usize>) {}
fn len(&self) -> usize {
0
}
}

#[test]
Expand Down Expand Up @@ -180,30 +179,22 @@ mod tests {
),
HandleResult::Flushed
);
assert_eq!(
handle_message(
Message::Flushing,
&mut (now - flush_period),
flush_period,
1,
&mut Q
),
HandleResult::Flushed
);
}

#[test]
fn test_handle_message_terminated() {
let (sender, receiver) = bounded::<()>(1);
assert_eq!(
handle_message(
Message::Terminate,
Message::Terminating(sender),
&mut Instant::now(),
Duration::from_nanos(1),
1,
&mut Q
),
HandleResult::Terminated
);
receiver.recv_timeout(Duration::from_millis(100)).unwrap();
}

struct WR;
Expand All @@ -216,7 +207,11 @@ mod tests {
#[test]
fn test_start_worker_terminate() {
let (sender, receiver) = unbounded();
let (sender2, receiver2) = bounded::<()>(1);

thread::spawn(move || start_worker(WR, receiver, Duration::from_nanos(1), 1));
sender.send(Message::Terminate).unwrap();
sender.send(Message::Terminating(sender2)).unwrap();

receiver2.recv_timeout(Duration::from_millis(100)).unwrap();
}
}

0 comments on commit c0f16ba

Please sign in to comment.