From 3286dba01c194552a18d9a3a20ec7f7ec22e2574 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 13 Aug 2023 22:40:06 +0200 Subject: [PATCH] Re-introduce mpsc try_recv() This was removed in a previous version of Tokio, but is now available again. --- src/error.rs | 12 +++++------- src/raft/node/candidate.rs | 13 ++++++------- src/raft/node/leader.rs | 13 ++++++------- src/raft/node/mod.rs | 3 +-- 4 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/error.rs b/src/error.rs index ceedbf487..3c6807f73 100644 --- a/src/error.rs +++ b/src/error.rs @@ -115,13 +115,11 @@ impl From for Error { } } -// see https://github.com/tokio-rs/tokio/pull/3263: remove try_recv() from mpsc types -// -// impl From for Error { -// fn from(err: tokio::sync::mpsc::error::TryRecvError) -> Self { -// Error::Internal(err.to_string()) -// } -// } +impl From for Error { + fn from(err: tokio::sync::mpsc::error::TryRecvError) -> Self { + Error::Internal(err.to_string()) + } +} impl From> for Error { fn from(err: tokio::sync::mpsc::error::SendError) -> Self { diff --git a/src/raft/node/candidate.rs b/src/raft/node/candidate.rs index d7d0bce89..03d90c3a7 100644 --- a/src/raft/node/candidate.rs +++ b/src/raft/node/candidate.rs @@ -138,7 +138,6 @@ mod tests { use super::super::tests::{assert_messages, assert_node}; use super::*; use crate::storage::log; - use futures::FutureExt; use std::collections::HashMap; use tokio::sync::mpsc; @@ -296,19 +295,19 @@ mod tests { assert_node(&node).is_leader().term(3); assert_eq!( - node_rx.recv().now_or_never(), - Some(Some(Message { + node_rx.try_recv()?, + Message { from: Address::Local, to: Address::Peers, term: 3, event: Event::Heartbeat { commit_index: 2, commit_term: 1 }, - })), + }, ); for to in peers.iter().cloned() { assert_eq!( - node_rx.recv().now_or_never(), - Some(Some(Message { + node_rx.try_recv()?, + Message { from: Address::Local, to: Address::Peer(to), term: 3, @@ -317,7 +316,7 @@ mod tests { base_term: 2, entries: vec![Entry { index: 4, term: 3, command: None }], }, - })) + } ) } diff --git a/src/raft/node/leader.rs b/src/raft/node/leader.rs index d22c16143..1cd9e1c96 100644 --- a/src/raft/node/leader.rs +++ b/src/raft/node/leader.rs @@ -235,7 +235,6 @@ mod tests { use super::super::tests::{assert_messages, assert_node}; use super::*; use crate::storage::log; - use futures::FutureExt; use pretty_assertions::assert_eq; use tokio::sync::mpsc; @@ -617,8 +616,8 @@ mod tests { for peer in peers.iter().cloned() { assert_eq!( - node_rx.recv().now_or_never(), - Some(Some(Message { + node_rx.try_recv()?, + Message { from: Address::Local, to: Address::Peer(peer), term: 3, @@ -627,7 +626,7 @@ mod tests { base_term: 3, entries: vec![Entry { index: 6, term: 3, command: Some(vec![0xaf]) },] }, - })) + } ) } assert_messages(&mut node_rx, vec![]); @@ -695,13 +694,13 @@ mod tests { } assert_eq!( - node_rx.recv().now_or_never(), - Some(Some(Message { + node_rx.try_recv()?, + Message { from: Address::Local, to: Address::Peers, term: 3, event: Event::Heartbeat { commit_index: 2, commit_term: 1 }, - })) + } ); } Ok(()) diff --git a/src/raft/node/mod.rs b/src/raft/node/mod.rs index 2238b19ac..52edee9b4 100644 --- a/src/raft/node/mod.rs +++ b/src/raft/node/mod.rs @@ -242,7 +242,6 @@ mod tests { use super::follower::tests::{follower_leader, follower_voted_for}; use super::*; use crate::storage::log; - use futures::FutureExt; use pretty_assertions::assert_eq; use tokio::sync::mpsc; @@ -251,7 +250,7 @@ mod tests { msgs: Vec, ) { let mut actual = Vec::new(); - while let Some(Some(message)) = rx.recv().now_or_never() { + while let Ok(message) = rx.try_recv() { actual.push(message) } assert_eq!(msgs, actual);