Skip to content

Commit

Permalink
Re-introduce mpsc try_recv()
Browse files Browse the repository at this point in the history
This was removed in a previous version of Tokio, but is now available
again.
  • Loading branch information
erikgrinaker committed Aug 13, 2023
1 parent 340b697 commit 3286dba
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 23 deletions.
12 changes: 5 additions & 7 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,11 @@ impl From<tokio::task::JoinError> for Error {
}
}

// see https://github.com/tokio-rs/tokio/pull/3263: remove try_recv() from mpsc types
//
// impl From<tokio::sync::mpsc::error::TryRecvError> for Error {
// fn from(err: tokio::sync::mpsc::error::TryRecvError) -> Self {
// Error::Internal(err.to_string())
// }
// }
impl From<tokio::sync::mpsc::error::TryRecvError> for Error {
fn from(err: tokio::sync::mpsc::error::TryRecvError) -> Self {
Error::Internal(err.to_string())
}
}

impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
fn from(err: tokio::sync::mpsc::error::SendError<T>) -> Self {
Expand Down
13 changes: 6 additions & 7 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ mod tests {
use super::super::tests::{assert_messages, assert_node};
use super::*;
use crate::storage::log;
use futures::FutureExt;
use std::collections::HashMap;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -296,19 +295,19 @@ mod tests {
assert_node(&node).is_leader().term(3);

assert_eq!(
node_rx.recv().now_or_never(),
Some(Some(Message {
node_rx.try_recv()?,
Message {
from: Address::Local,
to: Address::Peers,
term: 3,
event: Event::Heartbeat { commit_index: 2, commit_term: 1 },
})),
},
);

for to in peers.iter().cloned() {
assert_eq!(
node_rx.recv().now_or_never(),
Some(Some(Message {
node_rx.try_recv()?,
Message {
from: Address::Local,
to: Address::Peer(to),
term: 3,
Expand All @@ -317,7 +316,7 @@ mod tests {
base_term: 2,
entries: vec![Entry { index: 4, term: 3, command: None }],
},
}))
}
)
}

Expand Down
13 changes: 6 additions & 7 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ mod tests {
use super::super::tests::{assert_messages, assert_node};
use super::*;
use crate::storage::log;
use futures::FutureExt;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -617,8 +616,8 @@ mod tests {

for peer in peers.iter().cloned() {
assert_eq!(
node_rx.recv().now_or_never(),
Some(Some(Message {
node_rx.try_recv()?,
Message {
from: Address::Local,
to: Address::Peer(peer),
term: 3,
Expand All @@ -627,7 +626,7 @@ mod tests {
base_term: 3,
entries: vec![Entry { index: 6, term: 3, command: Some(vec![0xaf]) },]
},
}))
}
)
}
assert_messages(&mut node_rx, vec![]);
Expand Down Expand Up @@ -695,13 +694,13 @@ mod tests {
}

assert_eq!(
node_rx.recv().now_or_never(),
Some(Some(Message {
node_rx.try_recv()?,
Message {
from: Address::Local,
to: Address::Peers,
term: 3,
event: Event::Heartbeat { commit_index: 2, commit_term: 1 },
}))
}
);
}
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions src/raft/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ mod tests {
use super::follower::tests::{follower_leader, follower_voted_for};
use super::*;
use crate::storage::log;
use futures::FutureExt;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc;

Expand All @@ -251,7 +250,7 @@ mod tests {
msgs: Vec<T>,
) {
let mut actual = Vec::new();
while let Some(Some(message)) = rx.recv().now_or_never() {
while let Ok(message) = rx.try_recv() {
actual.push(message)
}
assert_eq!(msgs, actual);
Expand Down

0 comments on commit 3286dba

Please sign in to comment.