diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c36f6ca..50ff4a0 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,7 +13,7 @@ jobs: fail-fast: false matrix: version: - - 1.70.0 + - 1.80.0 - stable - nightly services: diff --git a/Cargo.toml b/Cargo.toml index 02815d1..ba5d45a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ rmp = "0.8" rmp-serde = "1.1" serde = "1.0" serde_derive = "1.0" +thiserror = "2.0" uuid = { version = "1.11", features = ["v4"] } [dev-dependencies] diff --git a/examples/main.rs b/examples/main.rs index 65d0a61..4856f0f 100644 --- a/examples/main.rs +++ b/examples/main.rs @@ -4,7 +4,6 @@ extern crate serde_derive; use log::info; use once_cell::sync::Lazy; use poston::{Client, Settings, WorkerPool}; -use pretty_env_logger; use rand::prelude::*; use rand::{self, distributions::Alphanumeric}; use std::time::{Duration, Instant, SystemTime}; diff --git a/src/buffer.rs b/src/buffer.rs index a3b0771..38b7973 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -1,11 +1,25 @@ -use crate::error::Error; +use crate::rmps::decode as rdecode; use crate::rmps::encode as rencode; use crate::rmps::Deserializer; use crate::time_pack::TimePack; use rmp::encode; use serde::Deserialize; +use serde::Serialize; use std::collections::VecDeque; use std::time::SystemTime; +use thiserror::Error; + +pub trait Buffer { + fn pack(&self) -> Result, BufferError>; +} + +impl Buffer for T { + fn pack(&self) -> Result, BufferError> { + let mut buf = Vec::new(); + rencode::write_named(&mut buf, self).map_err(BufferError::Pack)?; + Ok(buf) + } +} pub trait Take { fn take(&mut self, buf: &mut Vec); @@ -34,31 +48,70 @@ pub struct AckReply { pub ack: String, } -pub fn pack_record<'a>( +impl TryFrom<&[u8]> for AckReply { + type Error = BufferError; + fn try_from(value: &[u8]) -> Result { + unpack_response(value, value.len()) + } +} + +pub struct Record<'a> { + tag: &'a str, + entries: &'a [(SystemTime, Vec)], + chunk: &'a str, +} + +impl<'a> Record<'a> { + pub fn new(tag: &'a str, entries: &'a [(SystemTime, Vec)], chunk: &'a str) -> Self { + Self { + tag, + entries, + chunk, + } + } + + pub fn pack(&self) -> Result, BufferError> { + let mut buf = Vec::new(); + pack_record(&mut buf, self.tag, self.entries, self.chunk)?; + Ok(buf) + } +} + +fn pack_record<'a>( buf: &mut Vec, tag: &'a str, entries: &'a [(SystemTime, Vec)], chunk: &'a str, -) -> Result<(), Error> { +) -> Result<(), BufferError> { buf.push(0x93); - encode::write_str(buf, tag).map_err(|e| Error::Derive(e.to_string()))?; - encode::write_array_len(buf, entries.len() as u32).map_err(|e| Error::Derive(e.to_string()))?; + encode::write_str(buf, tag) + .map_err(|e| BufferError::Pack(rencode::Error::InvalidValueWrite(e)))?; + encode::write_array_len(buf, entries.len() as u32) + .map_err(|e| BufferError::Pack(rencode::Error::InvalidValueWrite(e)))?; for (t, entry) in entries { buf.push(0x92); t.write_time(buf) - .map_err(|e| Error::Derive(e.to_string()))?; + .map_err(|e| BufferError::Pack(rencode::Error::InvalidValueWrite(e)))?; buf.extend(entry.iter()); } let options = Options { chunk: chunk.to_string(), }; - rencode::write_named(buf, &options).map_err(|e| Error::Derive(e.to_string())) + rencode::write_named(buf, &options).map_err(BufferError::Pack) } -pub fn unpack_response(resp_buf: &[u8], size: usize) -> Result { +fn unpack_response(resp_buf: &[u8], size: usize) -> Result { let mut de = Deserializer::new(&resp_buf[0..size]); - Deserialize::deserialize(&mut de).map_err(|e| Error::Derive(e.to_string())) + Deserialize::deserialize(&mut de).map_err(BufferError::Unpack) +} + +#[derive(Error, Debug)] +pub enum BufferError { + #[error("pack failed")] + Pack(#[from] rencode::Error), + #[error("unpack failed")] + Unpack(#[from] rdecode::Error), } #[cfg(test)] @@ -149,7 +202,7 @@ mod unpack_response { #[test] fn it_should_unpack_as_ack_reply() { let mut resp_buf = [0u8; 64]; - for (i, e) in vec![0x81u8, 0xa3, 0x61, 0x63, 0x6b, 0xa4, 0x61, 0x62, 0x63, 0x3d] + for (i, e) in [0x81u8, 0xa3, 0x61, 0x63, 0x6b, 0xa4, 0x61, 0x62, 0x63, 0x3d] .iter() .enumerate() { diff --git a/src/client.rs b/src/client.rs index 4c6d43a..f5de5be 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,6 @@ +use crate::buffer::Buffer; use crate::connect; -use crate::error::Error; -use crate::rmps::encode as rencode; +use crate::error::ClientError; use crate::worker::{Message, Worker}; use crossbeam_channel::{bounded, unbounded, Sender}; use serde::Serialize; @@ -11,10 +11,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, SystemTime}; pub trait Client { - fn send(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), Error> + fn send(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), ClientError> where A: Serialize; - fn terminate(&self) -> Result<(), Error>; + fn terminate(&self) -> Result<(), ClientError>; } pub struct WorkerPool { @@ -68,25 +68,25 @@ impl WorkerPool { } impl Client for WorkerPool { - fn send(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), Error> + fn send(&self, tag: String, a: &A, timestamp: SystemTime) -> Result<(), ClientError> where A: Serialize, + A: Buffer, { if self.terminated.load(Ordering::Acquire) { debug!("Worker does already closed."); return Ok(()); } - let mut buf = Vec::new(); - rencode::write_named(&mut buf, a).map_err(|e| Error::Derive(e.to_string()))?; + let buf = a.pack().map_err(ClientError::Buffer)?; self.sender .send(Message::Queuing(tag, timestamp, buf)) - .map_err(|e| Error::Send(e.to_string()))?; + .map_err(ClientError::SendChannel)?; Ok(()) } - fn terminate(&self) -> Result<(), Error> { + fn terminate(&self) -> Result<(), ClientError> { if self.terminated.fetch_or(true, Ordering::SeqCst) { info!("Worker does already terminated."); return Ok(()); @@ -96,9 +96,7 @@ impl Client for WorkerPool { let (sender, receiver) = bounded::<()>(0); self.sender.send(Message::Terminating(sender)).unwrap(); - receiver - .recv() - .map_err(|e| Error::Terminate(e.to_string()))?; + receiver.recv().map_err(ClientError::RecieveChannel)?; Ok(()) } diff --git a/src/connect.rs b/src/connect.rs index 6b81080..b3ca520 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,5 +1,4 @@ -use crate::buffer; -use crate::error::Error; +use crate::buffer::{AckReply, BufferError}; use backoff::{Error as RetryError, ExponentialBackoff}; use std::cell::RefCell; use std::fmt::Debug; @@ -20,7 +19,7 @@ pub trait Reconnect { } pub trait WriteRead { - fn write_and_read(&mut self, buf: &[u8], chunk: &str) -> Result<(), Error>; + fn write_and_read(&mut self, buf: &[u8], chunk: &str) -> Result<(), StreamError>; } #[derive(Debug)] @@ -137,7 +136,7 @@ where A: ToSocketAddrs + Clone + Debug, S: Connect + Read + Write, { - fn write_and_read(&mut self, buf: &[u8], chunk: &str) -> Result<(), Error> { + fn write_and_read(&mut self, buf: &[u8], chunk: &str) -> Result<(), StreamError> { let backoff = ExponentialBackoff { current_interval: self.write_retry_initial_delay(), initial_interval: self.write_retry_initial_delay(), @@ -149,7 +148,7 @@ where let op = || { if self.should_reconnect() { self.reconnect() - .map_err(|e| RetryError::transient(Error::Network(e.to_string())))?; + .map_err(|e| RetryError::transient(StreamError::Network(e)))?; } self.write_all(buf) .and_then(|_| self.flush()) @@ -158,7 +157,7 @@ where if let Err(err) = self.close() { debug!("Failed to close the stream, cause: {:?}", err); } - RetryError::transient(Error::Network(e.to_string())) + RetryError::transient(StreamError::Network(e)) })?; let read_backoff = ExponentialBackoff { @@ -176,17 +175,15 @@ where debug!("Failed to read response, chunk: {}, cause: {:?}", chunk, e); use io::ErrorKind::*; match e.kind() { - WouldBlock | TimedOut => { - RetryError::transient(Error::Network(e.to_string())) - } + WouldBlock | TimedOut => RetryError::transient(StreamError::Network(e)), UnexpectedEof | BrokenPipe | ConnectionAborted | ConnectionRefused | ConnectionReset => { if let Err(err) = self.close() { debug!("Failed to close the stream, cause: {:?}", err); } - RetryError::permanent(Error::Network(e.to_string())) + RetryError::permanent(StreamError::Network(e)) } - _ => RetryError::Permanent(Error::Network(e.to_string())), + _ => RetryError::Permanent(StreamError::Network(e)), } }) }; @@ -205,7 +202,8 @@ where } })?; - let reply = buffer::unpack_response(&resp_buf, resp_buf.len()) + let reply = AckReply::try_from(resp_buf.as_ref()) + .map_err(StreamError::Buffer) .map_err(RetryError::transient)?; if reply.ack == chunk { Ok(()) @@ -215,7 +213,7 @@ where reply.ack, chunk ); - Err(RetryError::transient(Error::AckUmatched( + Err(RetryError::transient(StreamError::AckUmatched( reply.ack, chunk.to_string(), ))) @@ -233,7 +231,7 @@ impl Connect for TcpStream { where A: ToSocketAddrs + Clone + Debug, { - TcpStream::connect(addr).map(|s| { + TcpStream::connect(addr).inspect(|s| { s.set_nodelay(true).unwrap(); if !settings.read_timeout.is_zero() { s.set_read_timeout(Some(settings.read_timeout)).unwrap(); @@ -241,7 +239,6 @@ impl Connect for TcpStream { if !settings.write_timeout.is_zero() { s.set_write_timeout(Some(settings.write_timeout)).unwrap(); } - s }) } @@ -280,6 +277,16 @@ where }) } +#[derive(thiserror::Error, Debug)] +pub enum StreamError { + #[error("network error")] + Network(#[from] std::io::Error), + #[error("buffer error")] + Buffer(#[from] BufferError), + #[error("request chunk and response ack-id did not match, {0} /= {1}")] + AckUmatched(String, String), +} + #[cfg(test)] mod tests { use super::{io, Duration, ToSocketAddrs}; @@ -304,7 +311,7 @@ mod tests { where A: ToSocketAddrs + Clone, { - if let Ok(_) = addr.to_socket_addrs() { + if addr.to_socket_addrs().is_ok() { let count = CONN_COUNT.fetch_add(1, Ordering::SeqCst); if count % 20 == 0 { Ok(TestStream(AtomicUsize::new(1))) @@ -383,6 +390,7 @@ mod tests { #[derive(Debug)] struct TS; + #[allow(clippy::type_complexity)] static QUEUE: Lazy>>>> = Lazy::new(|| { let mut q = VecDeque::new(); diff --git a/src/emitter.rs b/src/emitter.rs index d69b99d..170a899 100644 --- a/src/emitter.rs +++ b/src/emitter.rs @@ -1,6 +1,5 @@ -use crate::buffer::{self, Take}; +use crate::buffer::{BufferError, Record, Take}; use crate::connect::*; -use crate::error::Error; use base64::{engine::general_purpose, Engine as _}; use std::cell::RefCell; use std::cmp; @@ -29,7 +28,11 @@ impl Emitter { q.push_back(elem) } - pub fn emit(&self, rw: &mut RW, size: Option) -> Result<(), Error> { + pub fn emit( + &self, + rw: &mut RW, + size: Option, + ) -> Result<(), EmitterError> { let mut queue = self.queue.borrow_mut(); if queue.is_empty() { return Ok(()); @@ -38,19 +41,22 @@ impl Emitter { let mut entries = Vec::with_capacity(cmp::min(qlen, size.unwrap_or(qlen))); queue.take(&mut entries); - let chunk = general_purpose::STANDARD.encode(&Uuid::new_v4().to_string()); - - let mut buf = Vec::new(); - buffer::pack_record( - &mut buf, - self.tag.as_str(), - entries.as_slice(), - chunk.as_str(), - )?; + let chunk = general_purpose::STANDARD.encode(Uuid::new_v4().to_string()); + let rec = Record::new(&self.tag, entries.as_slice(), &chunk); + let buf = rec.pack().map_err(EmitterError::Buffer)?; rw.write_and_read(&buf, &chunk) + .map_err(EmitterError::Stream) } } +#[derive(thiserror::Error, Debug)] +pub enum EmitterError { + #[error("buffer error")] + Buffer(#[from] BufferError), + #[error("stream error")] + Stream(#[from] StreamError), +} + #[cfg(test)] mod test { use super::*; @@ -58,7 +64,7 @@ mod test { struct TestStream; impl WriteRead for TestStream { - fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error> { + fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), StreamError> { Ok(()) } } @@ -117,8 +123,8 @@ mod test { struct TestErrStream; impl WriteRead for TestErrStream { - fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error> { - Err(Error::AckUmatched("a".to_string(), "b".to_string())) + fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), StreamError> { + Err(StreamError::AckUmatched("a".to_string(), "b".to_string())) } } @@ -153,7 +159,7 @@ mod test { } impl WriteRead for Mock { - fn write_and_read(&mut self, buf: &[u8], chunk: &str) -> Result<(), Error> { + fn write_and_read(&mut self, buf: &[u8], chunk: &str) -> Result<(), StreamError> { let mut acc = self.acc.borrow_mut(); let args = (buf.to_vec(), chunk.to_string()); acc.push(args); @@ -164,7 +170,7 @@ mod test { let emitter = Emitter::new("x".to_string()); for i in 1..1000 { - emitter.push((SystemTime::now(), vec![0x00, (i as u8).into()])); + emitter.push((SystemTime::now(), vec![0x00, (i as u8)])); } let rw = &mut Mock::new(); diff --git a/src/error.rs b/src/error.rs index 47d5fbe..59a81c4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,32 +1,13 @@ -use std::error::Error as StdError; -use std::fmt; +use crossbeam_channel::{RecvError, SendError}; -#[derive(Debug)] -pub enum Error { - Network(String), - Derive(String), - Send(String), - Terminate(String), - AckUmatched(String, String), - EmittingTimeout, - ConnectingTimeout, - NoAckResponse, -} - -impl StdError for Error {} +use crate::{buffer::BufferError, worker::Message}; -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = match *self { - Error::Network(ref e) => e, - Error::Derive(ref e) => e, - Error::Send(ref e) => e, - Error::Terminate(ref e) => e, - Error::AckUmatched(_, _) => "request chunk and response ack-id did not match", - Error::EmittingTimeout => "emitting timeout", - Error::ConnectingTimeout => "connecting timeout", - Error::NoAckResponse => "no ack response", - }; - write!(f, "{}", s) - } +#[derive(thiserror::Error, Debug)] +pub enum ClientError { + #[error("buffer error")] + Buffer(#[from] BufferError), + #[error("send error")] + SendChannel(#[from] SendError), + #[error("receive error")] + RecieveChannel(#[from] RecvError), } diff --git a/src/queue.rs b/src/queue.rs index 3ab084b..8013bf3 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -56,13 +56,12 @@ pub enum HandleResult { #[cfg(test)] mod tests { use super::*; - use crate::error::Error; #[test] fn test_push_flush() { struct W; impl WriteRead for W { - fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error> { + fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), StreamError> { Ok(()) } } @@ -75,24 +74,24 @@ mod tests { let now = SystemTime::now(); - queue.push("a".to_string(), now.clone(), vec![0u8, 9u8]); - queue.push("b".to_string(), now.clone(), vec![1u8, 8u8]); - queue.push("a".to_string(), now.clone(), vec![2u8, 7u8]); - queue.push("b".to_string(), now.clone(), vec![3u8, 6u8]); - queue.push("c".to_string(), now.clone(), vec![4u8, 5u8]); + queue.push("a".to_string(), now, vec![0u8, 9u8]); + queue.push("b".to_string(), now, vec![1u8, 8u8]); + queue.push("a".to_string(), now, vec![2u8, 7u8]); + queue.push("b".to_string(), now, vec![3u8, 6u8]); + queue.push("c".to_string(), now, vec![4u8, 5u8]); let expected = Emitter::new("a".to_string()); - expected.push((now.clone(), vec![0u8, 9u8])); - expected.push((now.clone(), vec![2u8, 7u8])); + expected.push((now, vec![0u8, 9u8])); + expected.push((now, vec![2u8, 7u8])); assert_eq!(queue.emitters().get("a").unwrap(), &expected); let expected = Emitter::new("b".to_string()); - expected.push((now.clone(), vec![1u8, 8u8])); - expected.push((now.clone(), vec![3u8, 6u8])); + expected.push((now, vec![1u8, 8u8])); + expected.push((now, vec![3u8, 6u8])); assert_eq!(queue.emitters().get("b").unwrap(), &expected); let expected = Emitter::new("c".to_string()); - expected.push((now.clone(), vec![4u8, 5u8])); + expected.push((now, vec![4u8, 5u8])); assert_eq!(queue.emitters().get("c").unwrap(), &expected); assert_eq!(queue.len(), 5); diff --git a/src/worker.rs b/src/worker.rs index bc5ff9c..a0f81d2 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -146,13 +146,13 @@ mod tests { #[test] fn test_handle_message_queued() { let msg = Message::Queuing("tag".into(), SystemTime::now(), vec![1, 2, 4]); - let mut now = Instant::now(); + let now = Instant::now(); let flush_period = Duration::from_secs(100); let flush_size = 2; let mut queue = Q; assert_eq!( - handle_message(msg, &mut now, flush_period, flush_size, &mut queue), + handle_message(msg, &now, flush_period, flush_size, &mut queue), HandleResult::Queued ); } @@ -165,7 +165,7 @@ mod tests { assert_eq!( handle_message( Message::Queuing("tag".into(), SystemTime::now(), vec![1, 2, 4]), - &mut (now - flush_period), + &(now - flush_period), flush_period, 1, &mut Q @@ -180,7 +180,7 @@ mod tests { assert_eq!( handle_message( Message::Terminating(sender), - &mut Instant::now(), + &Instant::now(), Duration::from_nanos(1), 1, &mut Q @@ -192,7 +192,7 @@ mod tests { struct WR; impl WriteRead for WR { - fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), crate::error::Error> { + fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), StreamError> { Ok(()) } }