From 0a6e344171a21c9a3999f7be0d138485968487c6 Mon Sep 17 00:00:00 2001 From: Takeru Sato Date: Sat, 9 May 2020 18:27:41 +0900 Subject: [PATCH] feat: wait for the flash to finish --- examples/main.rs | 13 ++++--------- src/client.rs | 44 +++++++++++++++++++++----------------------- src/emitter.rs | 4 ++++ src/error.rs | 2 ++ src/queue.rs | 5 +++++ src/worker.rs | 41 ++++++++++++++++++----------------------- 6 files changed, 54 insertions(+), 55 deletions(-) diff --git a/examples/main.rs b/examples/main.rs index 1a7c22b..61e62ce 100644 --- a/examples/main.rs +++ b/examples/main.rs @@ -9,7 +9,6 @@ use poston::{Client, Settings, WorkerPool}; use pretty_env_logger; use rand::prelude::*; use rand::{self, distributions::Alphanumeric}; -use std::sync::Mutex; use std::thread; use std::time::{Duration, Instant, SystemTime}; @@ -20,7 +19,7 @@ struct Human { } lazy_static! { - static ref POOL: Mutex = { + static ref POOL: WorkerPool = { let addr = "127.0.0.1:24224".to_string(); let settins = Settings { flush_period: Duration::from_millis(10), @@ -30,9 +29,7 @@ lazy_static! { read_timeout: Duration::from_secs(30), ..Default::default() }; - let pool = - WorkerPool::with_settings(&addr, &settins).expect("Couldn't create the worker pool."); - Mutex::new(pool) + WorkerPool::with_settings(&addr, &settins).expect("Couldn't create the worker pool.") }; } @@ -57,8 +54,7 @@ fn main() { let a = Human { age, name }; let timestamp = SystemTime::now(); - let pool = POOL.lock().expect("Client couldn't be locked."); - pool.send(tag, &a, timestamp).unwrap(); + POOL.send(tag, &a, timestamp).unwrap(); let dur = rng.gen_range(10, 500000); thread::sleep(Duration::new(0, dur)); @@ -73,8 +69,7 @@ fn main() { info!("End sending messages."); - let mut pool = POOL.lock().expect("Client couldn't be locked."); - pool.close(); + POOL.terminate().unwrap(); info!("End. elapsed: {:?}", start.elapsed()); } diff --git a/src/client.rs b/src/client.rs index 094bf41..b8d7ba5 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use crate::connect; use crate::error::Error; use crate::rmps::encode as rencode; use crate::worker::{Message, Worker}; -use crossbeam_channel::{unbounded, Sender}; +use crossbeam_channel::{bounded, unbounded, Sender}; use serde::Serialize; use std::fmt::Debug; use std::io; @@ -14,14 +14,13 @@ pub trait Client { fn send(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), Error> where A: Serialize; - fn send_flush(&self) -> Result<(), Error>; - fn close(&mut self); + fn terminate(&self) -> Result<(), Error>; } pub struct WorkerPool { worker: Worker, sender: Sender, - closed: AtomicBool, + terminated: AtomicBool, } impl WorkerPool { @@ -63,7 +62,7 @@ impl WorkerPool { Ok(WorkerPool { worker, sender, - closed: AtomicBool::new(false), + terminated: AtomicBool::new(false), }) } } @@ -73,7 +72,7 @@ impl Client for WorkerPool { where A: Serialize, { - if self.closed.load(Ordering::Acquire) { + if self.terminated.load(Ordering::Acquire) { debug!("Worker does already closed."); return Ok(()); } @@ -87,33 +86,32 @@ impl Client for WorkerPool { Ok(()) } - fn send_flush(&self) -> Result<(), Error> { - self.sender - .send(Message::Flushing) - .map_err(|e| Error::SendError(e.to_string()))?; - Ok(()) - } - - fn close(&mut self) { - if self.closed.fetch_or(true, Ordering::SeqCst) { - info!("Worker does already closed."); - return; + fn terminate(&self) -> Result<(), Error> { + if self.terminated.fetch_or(true, Ordering::SeqCst) { + info!("Worker does already terminated."); + return Ok(()); } info!("Sending terminate message to worker."); - self.sender.send(Message::Terminate).unwrap(); - - info!("Shutting down worker."); + let (sender, receiver) = bounded::<()>(0); + self.sender.send(Message::Terminating(sender)).unwrap(); + receiver + .recv() + .map_err(|e| Error::TerminateError(e.to_string()))?; - let wkr = &mut self.worker; - wkr.join_handler(); + Ok(()) } } impl Drop for WorkerPool { fn drop(&mut self) { - self.close() + self.terminate().unwrap(); + let wkr = &mut self.worker; + + info!("Shutting down worker."); + + wkr.join_handler(); } } diff --git a/src/emitter.rs b/src/emitter.rs index 1e05ee7..e2fdb5f 100644 --- a/src/emitter.rs +++ b/src/emitter.rs @@ -17,6 +17,10 @@ impl Emitter { Emitter { tag, queue } } + pub fn len(&self) -> usize { + self.queue.borrow().len() + } + pub fn push(&self, elem: (SystemTime, Vec)) { let mut q = self.queue.borrow_mut(); q.push_back(elem) diff --git a/src/error.rs b/src/error.rs index 147ee74..94a1642 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,6 +6,7 @@ pub enum Error { NetworkError(String), DeriveError(String), SendError(String), + TerminateError(String), AckUmatchedError(String, String), EmittingTimeoutError, ConnectingTimeoutError, @@ -18,6 +19,7 @@ impl StdError for Error { Error::NetworkError(ref e) => e, Error::DeriveError(ref e) => e, Error::SendError(ref e) => e, + Error::TerminateError(ref e) => e, Error::AckUmatchedError(_, _) => "request chunk and response ack-id did not match", Error::EmittingTimeoutError => "emitting timeout", Error::ConnectingTimeoutError => "connecting timeout", diff --git a/src/queue.rs b/src/queue.rs index 083d15f..9647b85 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -6,6 +6,7 @@ use std::time::SystemTime; pub trait Queue { fn push(&mut self, tag: String, tm: SystemTime, msg: Vec); fn flush(&mut self, size: Option); + fn len(&self) -> usize; } pub struct QueueHandler { @@ -32,6 +33,10 @@ impl Queue for QueueHandler { } } } + + fn len(&self) -> usize { + self.emitters.values().map(|e| e.len()).sum() + } } #[derive(PartialEq, Debug)] diff --git a/src/worker.rs b/src/worker.rs index 5a0d156..f6c0afd 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,6 +1,6 @@ use crate::connect::*; use crate::queue::*; -use crossbeam_channel::Receiver; +use crossbeam_channel::{Receiver, Sender}; use std::collections::HashMap; use std::fmt::Debug; use std::io; @@ -48,8 +48,7 @@ impl Worker { pub enum Message { Queuing(String, SystemTime, Vec), - Flushing, - Terminate, + Terminating(Sender<()>), } fn start_worker( @@ -106,14 +105,11 @@ fn handle_message( HandleResult::Queued } } - Message::Flushing => { - info!("Received a flushing message"); - queue.flush(None); - HandleResult::Flushed - } - Message::Terminate => { - info!("Received a terminate message"); + Message::Terminating(sender) => { + info!("Received a terminating message"); queue.flush(None); + info!("Flushed, queue remaining: {}", queue.len()); + sender.send(()).unwrap(); HandleResult::Terminated } } @@ -123,7 +119,7 @@ fn handle_message( mod tests { use super::*; use crate::connect::ConnectionSettings; - use crossbeam_channel::unbounded; + use crossbeam_channel::{bounded, unbounded}; #[test] fn worker_create_should_return_err_when_the_connection_open_is_failed() { @@ -149,6 +145,9 @@ mod tests { impl Queue for Q { fn push(&mut self, _tag: String, _tm: SystemTime, _msg: Vec) {} fn flush(&mut self, _size: Option) {} + fn len(&self) -> usize { + 0 + } } #[test] @@ -180,23 +179,14 @@ mod tests { ), HandleResult::Flushed ); - assert_eq!( - handle_message( - Message::Flushing, - &mut (now - flush_period), - flush_period, - 1, - &mut Q - ), - HandleResult::Flushed - ); } #[test] fn test_handle_message_terminated() { + let (sender, receiver) = bounded::<()>(1); assert_eq!( handle_message( - Message::Terminate, + Message::Terminating(sender), &mut Instant::now(), Duration::from_nanos(1), 1, @@ -204,6 +194,7 @@ mod tests { ), HandleResult::Terminated ); + receiver.recv_timeout(Duration::from_millis(100)).unwrap(); } struct WR; @@ -216,7 +207,11 @@ mod tests { #[test] fn test_start_worker_terminate() { let (sender, receiver) = unbounded(); + let (sender2, receiver2) = bounded::<()>(1); + thread::spawn(move || start_worker(WR, receiver, Duration::from_nanos(1), 1)); - sender.send(Message::Terminate).unwrap(); + sender.send(Message::Terminating(sender2)).unwrap(); + + receiver2.recv_timeout(Duration::from_millis(100)).unwrap(); } }