diff --git a/lolraft/src/process/command_log/mod.rs b/lolraft/src/process/command_log/mod.rs index 658a1c86..ccc0bb59 100644 --- a/lolraft/src/process/command_log/mod.rs +++ b/lolraft/src/process/command_log/mod.rs @@ -106,7 +106,7 @@ impl Inner { // If the same snapshot already exists, we can skip the insertion. if new_snapshot_index == cur_snapshot_index { - return Ok(()) + return Ok(()); } self.storage.insert_entry(new_snapshot_index, e).await?; diff --git a/lolraft/src/process/peer_svc/replication.rs b/lolraft/src/process/peer_svc/replication.rs index 9ee6841d..38a24bfe 100644 --- a/lolraft/src/process/peer_svc/replication.rs +++ b/lolraft/src/process/peer_svc/replication.rs @@ -31,7 +31,7 @@ impl PeerSvc { }) } - pub async fn advance_replication(&self, follower_id: NodeId) -> Result { + pub async fn advance_replication(&self, follower_id: NodeId) -> Result<()> { let peer_context = self .peer_contexts .read() @@ -43,10 +43,7 @@ impl PeerSvc { let cur_last_log_index = self.command_log.get_log_last_index().await?; // More entries to send? - let should_send = old_progress.next_index <= cur_last_log_index; - if !should_send { - return Ok(false); - } + ensure!(old_progress.next_index <= cur_last_log_index); // The entries to be sent may be deleted due to a previous compaction. // In this case, replication will reset from the current snapshot index. @@ -62,7 +59,7 @@ impl PeerSvc { .get_mut(&follower_id) .context(Error::PeerNotFound(follower_id.clone()))? .progress = new_progress; - return Ok(true); + return Ok(()); } let n_max_possible = cur_last_log_index - old_progress.next_index + 1; @@ -78,28 +75,27 @@ impl PeerSvc { .await?; let conn = self.driver.connect(follower_id.clone()); - let send_resp = conn.send_replication_stream(out_stream).await; - let new_progress = if let Ok(resp) = send_resp { - match resp { - response::ReplicationStream { - n_inserted: 0, - log_last_index: last_log_index, - } => ReplicationProgress { - match_index: old_progress.match_index, - next_index: std::cmp::min(old_progress.next_index - 1, last_log_index + 1), - next_max_cnt: 1, - }, - response::ReplicationStream { n_inserted, .. } => ReplicationProgress { - match_index: old_progress.next_index + n_inserted - 1, - next_index: old_progress.next_index + n_inserted, - // If all entries are successfully inserted, then it is safe to double - // the replication width for quick replication. - next_max_cnt: if n_inserted == n { n * 2 } else { n }, - }, - } - } else { - old_progress + // If the follower is unable to respond for some internal reasons, + // we shouldn't repeat request otherwise the situation would be worse. + let resp = conn.send_replication_stream(out_stream).await?; + + let new_progress = match resp { + response::ReplicationStream { + n_inserted: 0, + log_last_index: last_log_index, + } => ReplicationProgress { + match_index: old_progress.match_index, + next_index: std::cmp::min(old_progress.next_index - 1, last_log_index + 1), + next_max_cnt: 1, + }, + response::ReplicationStream { n_inserted, .. } => ReplicationProgress { + match_index: old_progress.next_index + n_inserted - 1, + next_index: old_progress.next_index + n_inserted, + // If all entries are successfully inserted, then it is safe to double + // the replication width for quick replication. + next_max_cnt: if n_inserted == n { n * 2 } else { n }, + }, }; self.peer_contexts @@ -108,6 +104,6 @@ impl PeerSvc { .context(Error::PeerNotFound(follower_id.clone()))? .progress = new_progress; - Ok(true) + Ok(()) } } diff --git a/lolraft/src/process/thread/replication.rs b/lolraft/src/process/thread/replication.rs index 1d1cfdab..b9f1be00 100644 --- a/lolraft/src/process/thread/replication.rs +++ b/lolraft/src/process/thread/replication.rs @@ -13,12 +13,11 @@ impl Thread { return Ok(false); } - let cont = self - .peers + self.peers .advance_replication(self.follower_id.clone()) .await?; - Ok(cont) + Ok(true) } fn do_loop(self) -> ThreadHandle { diff --git a/tests/env/tests/tests.rs b/tests/env/tests/tests.rs index af012e55..0f93c1b7 100644 --- a/tests/env/tests/tests.rs +++ b/tests/env/tests/tests.rs @@ -61,4 +61,4 @@ async fn drop_env() -> Result<()> { } Ok(()) -} \ No newline at end of file +} diff --git a/tests/lol-tests/tests/multi_raft.rs b/tests/lol-tests/tests/multi_raft.rs index 49cb5c36..c60fbaeb 100644 --- a/tests/lol-tests/tests/multi_raft.rs +++ b/tests/lol-tests/tests/multi_raft.rs @@ -30,11 +30,10 @@ async fn N3_L100_K3_multi_raft_cluster() -> Result<()> { futs.push(fut); } futures::future::try_join_all(futs).await?; - + Ok(()) } - #[serial] #[tokio::test(flavor = "multi_thread")] async fn N1_L100_K3_multi_raft_io() -> Result<()> { diff --git a/tests/testapp/src/lib.rs b/tests/testapp/src/lib.rs index ff23a59c..07f6c8be 100644 --- a/tests/testapp/src/lib.rs +++ b/tests/testapp/src/lib.rs @@ -76,9 +76,7 @@ impl Client { use tokio_retry::Retry; // 200ms, 400, 800, 1600, 3200 - let strategy = ExponentialBackoff::from_millis(2) - .factor(100) - .take(8); + let strategy = ExponentialBackoff::from_millis(2).factor(100).take(8); let fut = Retry::spawn(strategy, || { let mut cli = self.cli.clone();