From a505ab7b50576bb2a9da5bfa7cf5da2d794359c9 Mon Sep 17 00:00:00 2001 From: Dmitry Mazurin Date: Thu, 11 May 2023 18:06:31 +0100 Subject: [PATCH] async bridge: ignore send errors when channel is full --- journal/src/async_bridge.rs | 18 ++++++++++++------ journal/src/journal.rs | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/journal/src/async_bridge.rs b/journal/src/async_bridge.rs index 513e316..c762bed 100644 --- a/journal/src/async_bridge.rs +++ b/journal/src/async_bridge.rs @@ -99,7 +99,7 @@ impl AsyncRead for AsyncReadJournalStreamHandle { if p.buf.is_none() { match p.rx.try_recv() { // EOF - Ok(buf) if buf.len() == 0 => return Poll::Ready(Ok(())), + Ok(buf) if buf.is_empty() => return Poll::Ready(Ok(())), Ok(buf) => { p.buf = Some(buf); p.read = 0; @@ -180,7 +180,9 @@ impl BufRead for ReadReceiver { loop { // wake up future - self.waker.take().map(|waker| waker.wake()); + if let Some(waker) = self.waker.take() { + waker.wake() + } match self.rx.blocking_recv() { Some(AsyncWriteProto::W(waker)) => { self.waker = Some(waker); @@ -234,7 +236,9 @@ impl Read for ReadReceiver { impl Drop for ReadReceiver { fn drop(&mut self) { self.rx.close(); - self.waker.take().map(|waker| waker.wake()); + if let Some(waker) = self.waker.take() { + waker.wake() + } while let Ok(message) = self.rx.try_recv() { if let AsyncWriteProto::W(waker) = message { waker.wake() @@ -337,9 +341,11 @@ impl AsyncWrite for AsyncWriteJournalStreamHandle { let me = self.get_mut(); match me.rx.try_recv() { Err(TryRecvError::Empty) => { - me.tx - .try_send(AsyncWriteProto::W(ctx.waker().clone())) - .map_err(to_err)?; + match me.tx.try_send(AsyncWriteProto::W(ctx.waker().clone())) { + Ok(_) => (), + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => (), + Err(e) => return Poll::Ready(Err(to_err(e))), + } Poll::Pending } Err(e @ TryRecvError::Disconnected) => Poll::Ready(Err(to_err(e))), diff --git a/journal/src/journal.rs b/journal/src/journal.rs index e620536..4c56c9d 100644 --- a/journal/src/journal.rs +++ b/journal/src/journal.rs @@ -418,7 +418,7 @@ where } } Some(Ok(( - self.current_snapshot.as_ref().unwrap().clone(), + *self.current_snapshot.as_ref().unwrap(), blob_header, buf, )))