Skip to content

Commit

Permalink
async bridge: ignore send errors when channel is full
Browse files Browse the repository at this point in the history
  • Loading branch information
dmzmk committed May 11, 2023
1 parent e61f430 commit a505ab7
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
18 changes: 12 additions & 6 deletions journal/src/async_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl AsyncRead for AsyncReadJournalStreamHandle {
if p.buf.is_none() {
match p.rx.try_recv() {
// EOF
Ok(buf) if buf.len() == 0 => return Poll::Ready(Ok(())),
Ok(buf) if buf.is_empty() => return Poll::Ready(Ok(())),
Ok(buf) => {
p.buf = Some(buf);
p.read = 0;
Expand Down Expand Up @@ -180,7 +180,9 @@ impl BufRead for ReadReceiver {

loop {
// wake up future
self.waker.take().map(|waker| waker.wake());
if let Some(waker) = self.waker.take() {
waker.wake()
}
match self.rx.blocking_recv() {
Some(AsyncWriteProto::W(waker)) => {
self.waker = Some(waker);
Expand Down Expand Up @@ -234,7 +236,9 @@ impl Read for ReadReceiver {
impl Drop for ReadReceiver {
fn drop(&mut self) {
self.rx.close();
self.waker.take().map(|waker| waker.wake());
if let Some(waker) = self.waker.take() {
waker.wake()
}
while let Ok(message) = self.rx.try_recv() {
if let AsyncWriteProto::W(waker) = message {
waker.wake()
Expand Down Expand Up @@ -337,9 +341,11 @@ impl AsyncWrite for AsyncWriteJournalStreamHandle {
let me = self.get_mut();
match me.rx.try_recv() {
Err(TryRecvError::Empty) => {
me.tx
.try_send(AsyncWriteProto::W(ctx.waker().clone()))
.map_err(to_err)?;
match me.tx.try_send(AsyncWriteProto::W(ctx.waker().clone())) {
Ok(_) => (),
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => (),
Err(e) => return Poll::Ready(Err(to_err(e))),
}
Poll::Pending
}
Err(e @ TryRecvError::Disconnected) => Poll::Ready(Err(to_err(e))),
Expand Down
2 changes: 1 addition & 1 deletion journal/src/journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ where
}
}
Some(Ok((
self.current_snapshot.as_ref().unwrap().clone(),
*self.current_snapshot.as_ref().unwrap(),
blob_header,
buf,
)))
Expand Down

0 comments on commit a505ab7

Please sign in to comment.