Skip to content

Commit

Permalink
Merge pull request #73 from tkrs/fix/read-error-handling
Browse files Browse the repository at this point in the history
Retry read if its returned WouldBlock
  • Loading branch information
tkrs authored Mar 1, 2019
2 parents e47c362 + 346108b commit 9106a16
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 13 deletions.
4 changes: 3 additions & 1 deletion examples/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ lazy_static! {
workers: 1,
flush_period: Duration::from_millis(10),
max_flush_entries: 1000,
connection_retry_timeout: Duration::from_secs(3),
connection_retry_timeout: Duration::from_secs(60),
write_timeout: Duration::from_secs(30),
read_timeout: Duration::from_secs(30),
..Default::default()
};
let pool =
Expand Down
14 changes: 12 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ impl WorkerPool {
write_retry_initial_delay: settings.write_retry_initial_delay,
write_retry_max_delay: settings.write_retry_max_delay,
write_retry_timeout: settings.write_retry_timeout,
read_retry_initial_delay: settings.read_retry_initial_delay,
read_retry_max_delay: settings.read_retry_max_delay,
read_retry_timeout: settings.read_retry_timeout,
};
match Worker::create(
id,
Expand Down Expand Up @@ -151,10 +154,14 @@ pub struct Settings {
pub connection_retry_max_delay: Duration,
pub connection_retry_timeout: Duration,
pub write_timeout: Duration,
pub read_timeout: Duration,

pub write_retry_initial_delay: Duration,
pub write_retry_max_delay: Duration,
pub write_retry_timeout: Duration,
pub read_timeout: Duration,
pub read_retry_initial_delay: Duration,
pub read_retry_max_delay: Duration,
pub read_retry_timeout: Duration,
}

impl Default for Settings {
Expand All @@ -168,7 +175,10 @@ impl Default for Settings {
connection_retry_timeout: Duration::from_secs(60),
write_retry_initial_delay: Duration::from_millis(5),
write_retry_max_delay: Duration::from_secs(5),
write_retry_timeout: Duration::from_secs(10),
write_retry_timeout: Duration::from_secs(30),
read_retry_initial_delay: Duration::from_millis(5),
read_retry_max_delay: Duration::from_secs(5),
read_retry_timeout: Duration::from_secs(10),
write_timeout: Duration::from_secs(1),
read_timeout: Duration::from_secs(1),
}
Expand Down
62 changes: 55 additions & 7 deletions src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::cell::RefCell;
use std::fmt::Debug;
use std::io::{self, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::thread;
use std::time::Duration;

pub trait Connect<T>
Expand Down Expand Up @@ -54,6 +55,10 @@ pub struct ConnectionSettings {
pub write_retry_timeout: Duration,
pub write_retry_max_delay: Duration,
pub write_retry_initial_delay: Duration,

pub read_retry_timeout: Duration,
pub read_retry_max_delay: Duration,
pub read_retry_initial_delay: Duration,
}

impl<A, S> Stream<A, S>
Expand All @@ -80,6 +85,16 @@ where
fn write_retry_max_delay(&self) -> Duration {
self.settings.write_retry_max_delay
}

fn read_retry_initial_delay(&self) -> Duration {
self.settings.read_retry_initial_delay
}
fn read_retry_timeout(&self) -> Duration {
self.settings.read_retry_timeout
}
fn read_retry_max_delay(&self) -> Duration {
self.settings.read_retry_max_delay
}
}

impl<A, S> Reconnect for Stream<A, S>
Expand All @@ -88,8 +103,10 @@ where
S: Connect<S> + TcpConfig,
{
fn reconnect(&mut self) -> io::Result<()> {
debug!("Start reconnect().");
let stream = connect_with_retry(self.addr.clone(), self.settings)?;
*self.stream.borrow_mut() = stream;
debug!("End reconnect().");
Ok(())
}
}
Expand Down Expand Up @@ -146,17 +163,45 @@ where
return e.map_err(|e| Error::Transient(MyError::NetworkError(e)));
}

let mut read_backoff = ExponentialBackoff {
current_interval: self.read_retry_initial_delay(),
initial_interval: self.read_retry_initial_delay(),
max_interval: self.read_retry_max_delay(),
max_elapsed_time: Some(self.read_retry_timeout()),
..Default::default()
};

let mut resp_buf = [0u8; 64];
let read_size = self.read(&mut resp_buf).map_err(|e| {

let mut read_op = || {
self.read(&mut resp_buf).map_err(|e| {
debug!("Failed to read response, chunk: {}, cause: {:?}.", chunk, e);
if e.kind() == io::ErrorKind::WouldBlock {
Error::Transient(MyError::NetworkError(e))
} else {
Error::Permanent(MyError::NetworkError(e))
}
})
};

let read_size = read_op.retry(&mut read_backoff).map_err(|e| {
warn!("Failed to read response, chunk: {}, cause: {:?}.", chunk, e);
Error::Transient(MyError::NetworkError(e))
match e {
Error::Permanent(e) => Error::Transient(e),
err => err,
}
})?;

// It seems that read() returns Ok(0) while Fluentd is reloading config.
// It will take a few seconds to restart them.
// And in this case even if retry writing, 0 is returned unless the connection reconnects.
if read_size == 0 {
warn!("Received empty response, chunk: {}.", chunk);
if let Err(err) = self.reconnect() {
warn!("Failed to reconnect: {:?}.", err);
}

thread::sleep(Duration::from_secs(5));
Err(Error::Transient(MyError::NoAckResponseError))
} else {
let reply =
Expand All @@ -168,9 +213,6 @@ where
"Did not match ack and chunk, ack: {}, chunk: {}.",
reply.ack, chunk
);
if let Err(err) = self.reconnect() {
warn!("Failed to reconnect: {:?}.", err);
}
Err(Error::Transient(MyError::AckUmatchedError(
reply.ack,
chunk.to_string(),
Expand Down Expand Up @@ -236,8 +278,14 @@ where
};

op.retry(&mut backoff).map_err(|err| match err {
Error::Transient(e) => e,
Error::Permanent(e) => e,
Error::Transient(e) => {
warn!("Failed to connect: {:?}", e);
e
}
Error::Permanent(e) => {
warn!("Failed to connect: {:?}", e);
e
}
})
}

Expand Down
78 changes: 75 additions & 3 deletions src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ impl Emitter {
}

pub fn emit<RW: WriteRead>(&self, rw: &mut RW, size: Option<usize>) {
let chunk = base64::encode(&Uuid::new_v4().to_string());
let mut buf = Vec::new();

let mut queue = self.queue.borrow_mut();
Expand All @@ -43,15 +42,88 @@ impl Emitter {
let mut entries = Vec::with_capacity(size);
queue.take(&mut entries);

let chunk = base64::encode(&Uuid::new_v4().to_string());

let _ = buffer::pack_record(&mut buf, self.tag.as_str(), entries, chunk.as_str());
if let Err(err) = rw.write_and_read(&buf, &chunk) {
error!(
"Tag '{}', an unexpected error occurred during emitting message: '{:?}'.",
"Tag '{}' unexpected error occurred during emitting message: '{:?}'.",
self.tag, err
);
}
}
}

#[cfg(test)]
mod test {}
mod test {
use super::*;
use crate::error::Error as MyError;
use backoff::Error;

struct TestStream;

impl WriteRead for TestStream {
fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error<MyError>> {
Ok(())
}
}

#[test]
fn emit_consume_queeu() {
let emitter = Emitter::new(1, "x".to_string());

emitter.push((SystemTime::now(), vec![0x00, 0x01]));
emitter.push((SystemTime::now(), vec![0x00, 0x02]));
emitter.push((SystemTime::now(), vec![0x00, 0x03]));
emitter.push((SystemTime::now(), vec![0x00, 0x04]));
emitter.push((SystemTime::now(), vec![0x00, 0x05]));

{
emitter.emit(&mut TestStream, Some(3));
let q = emitter.queue.borrow_mut();

assert_eq!(q.len(), 2);
}

{
emitter.emit(&mut TestStream, Some(3));
let q = emitter.queue.borrow_mut();

assert_eq!(q.len(), 0);
}

{
emitter.emit(&mut TestStream, Some(3));
let q = emitter.queue.borrow_mut();

assert_eq!(q.len(), 0);
}
}

struct TestErrStream;

impl WriteRead for TestErrStream {
fn write_and_read(&mut self, _buf: &[u8], _chunk: &str) -> Result<(), Error<MyError>> {
Err(Error::Permanent(MyError::AckUmatchedError(
"a".to_string(),
"b".to_string(),
)))
}
}

#[test]
fn emit_consume_queue_with_error_stream() {
let emitter = Emitter::new(1, "x".to_string());

emitter.push((SystemTime::now(), vec![0x00, 0x01]));
emitter.push((SystemTime::now(), vec![0x00, 0x02]));
emitter.push((SystemTime::now(), vec![0x00, 0x03]));
emitter.push((SystemTime::now(), vec![0x00, 0x04]));
emitter.push((SystemTime::now(), vec![0x00, 0x05]));

emitter.emit(&mut TestErrStream, Some(3));
let q = emitter.queue.borrow_mut();

assert_eq!(q.len(), 2);
}
}

0 comments on commit 9106a16

Please sign in to comment.