From 0d6aa24659c7677fafb476abbf225369ad5d858e Mon Sep 17 00:00:00 2001
From: Guanzhou Jose Hu <35757009+josehu07@users.noreply.github.com>
Date: Sun, 8 Oct 2023 19:48:21 -0500
Subject: [PATCH] Raft implemented & Several bug fixes (#19)
* Implemented ATC '14 version of Raft
* Made snapshotting conservative
* Added missing holes filling mechanism to Paxos variants
* Several bug fixes
---
.github/workflows/tests_proc.yml | 6 +-
.github/workflows/tests_unit.yml | 2 +-
Cargo.lock | 137 +-
README.md | 11 +-
scripts/local_cluster.py | 15 +-
scripts/workflow_test.py | 14 +
src/lib.rs | 1 +
src/manager/clusman.rs | 74 +-
src/manager/reigner.rs | 2 +-
src/protocols/mod.rs | 18 +
src/protocols/multipaxos.rs | 596 +++--
src/protocols/raft.rs | 2222 +++++++++++++++++++
src/protocols/rep_nothing.rs | 72 +-
src/protocols/rs_paxos.rs | 861 ++++---
src/protocols/simple_push.rs | 83 +-
src/server/external.rs | 3 +-
src/server/storage.rs | 79 +-
src/server/transport.rs | 20 +-
src/utils/bitmap.rs | 24 +
src/utils/error.rs | 2 +
src/utils/rscoding.rs | 15 +-
summerset_client/src/clients/repl.rs | 167 +-
summerset_client/src/clients/tester.rs | 21 +
summerset_client/src/drivers/closed_loop.rs | 148 +-
summerset_client/src/drivers/open_loop.rs | 60 +-
25 files changed, 3933 insertions(+), 720 deletions(-)
create mode 100644 src/protocols/raft.rs
diff --git a/.github/workflows/tests_proc.yml b/.github/workflows/tests_proc.yml
index e8fbde3f..dbd7195e 100644
--- a/.github/workflows/tests_proc.yml
+++ b/.github/workflows/tests_proc.yml
@@ -16,5 +16,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- - name: Run proc tests
- run: python3 scripts/workflow_test.py
+ - name: Run proc tests (MultiPaxos)
+ run: python3 scripts/workflow_test.py -p MultiPaxos
+ - name: Run proc tests (Raft)
+ run: python3 scripts/workflow_test.py -p Raft
diff --git a/.github/workflows/tests_unit.yml b/.github/workflows/tests_unit.yml
index 0a1fd8d6..57aa8fb3 100644
--- a/.github/workflows/tests_unit.yml
+++ b/.github/workflows/tests_unit.yml
@@ -16,5 +16,5 @@ jobs:
steps:
- uses: actions/checkout@v3
- - name: Run unit tests
+ - name: Run all unit tests
run: cargo test --workspace --verbose
diff --git a/Cargo.lock b/Cargo.lock
index 48d1c203..3a18786a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -39,9 +39,9 @@ dependencies = [
[[package]]
name = "anstream"
-version = "0.5.0"
+version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c"
+checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -53,15 +53,15 @@ dependencies = [
[[package]]
name = "anstyle"
-version = "1.0.3"
+version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46"
+checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87"
[[package]]
name = "anstyle-parse"
-version = "0.2.1"
+version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333"
+checksum = "317b9a89c1868f5ea6ff1d9539a69f45dffc21ce321ac1fd1160dfa48c8e2140"
dependencies = [
"utf8parse",
]
@@ -77,9 +77,9 @@ dependencies = [
[[package]]
name = "anstyle-wincon"
-version = "2.1.0"
+version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd"
+checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628"
dependencies = [
"anstyle",
"windows-sys",
@@ -93,7 +93,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -105,7 +105,7 @@ dependencies = [
"attribute-derive-macro",
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -121,7 +121,7 @@ dependencies = [
"proc-macro2",
"quote",
"quote-use",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -159,9 +159,9 @@ checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]]
name = "byteorder"
-version = "1.4.3"
+version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
+checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
@@ -189,9 +189,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
-version = "4.4.4"
+version = "4.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136"
+checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956"
dependencies = [
"clap_builder",
"clap_derive",
@@ -199,9 +199,9 @@ dependencies = [
[[package]]
name = "clap_builder"
-version = "4.4.4"
+version = "4.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56"
+checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45"
dependencies = [
"anstream",
"anstyle",
@@ -218,7 +218,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -280,7 +280,7 @@ checksum = "146398d62142a0f35248a608f17edf0dde57338354966d6e41d0eb2d16980ccb"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -324,25 +324,14 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
-version = "0.3.3"
+version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd"
+checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860"
dependencies = [
- "errno-dragonfly",
"libc",
"windows-sys",
]
-[[package]]
-name = "errno-dragonfly"
-version = "0.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf"
-dependencies = [
- "cc",
- "libc",
-]
-
[[package]]
name = "fixedbitset"
version = "0.4.2"
@@ -423,7 +412,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -486,7 +475,7 @@ checksum = "13a1bcfb855c1f340d5913ab542e36f25a1c56f57de79022928297632435dec2"
dependencies = [
"attribute-derive",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -517,9 +506,9 @@ dependencies = [
[[package]]
name = "hashbrown"
-version = "0.14.0"
+version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
+checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12"
[[package]]
name = "heck"
@@ -541,12 +530,12 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "indexmap"
-version = "2.0.0"
+version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
+checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897"
dependencies = [
"equivalent",
- "hashbrown 0.14.0",
+ "hashbrown 0.14.1",
]
[[package]]
@@ -589,21 +578,21 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
-version = "0.2.148"
+version = "0.2.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b"
+checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b"
[[package]]
name = "libm"
-version = "0.2.7"
+version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4"
+checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
[[package]]
name = "linux-raw-sys"
-version = "0.4.7"
+version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128"
+checksum = "45786cec4d5e54a224b15cb9f06751883103a27c19c93eda09b0b4f5f08fefac"
[[package]]
name = "lock_api"
@@ -656,9 +645,9 @@ dependencies = [
[[package]]
name = "memchr"
-version = "2.6.3"
+version = "2.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c"
+checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
[[package]]
name = "minimal-lexical"
@@ -729,9 +718,9 @@ dependencies = [
[[package]]
name = "num-traits"
-version = "0.2.16"
+version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2"
+checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c"
dependencies = [
"autocfg",
]
@@ -914,9 +903,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.67"
+version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
+checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c"
dependencies = [
"unicode-ident",
]
@@ -938,7 +927,7 @@ checksum = "a7b5abe3fe82fdeeb93f44d66a7b444dedf2e4827defb0a8e69c437b2de2ef94"
dependencies = [
"quote",
"quote-use-macros",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -950,7 +939,7 @@ dependencies = [
"derive-where",
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -1029,13 +1018,13 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.9.5"
+version = "1.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47"
+checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff"
dependencies = [
"aho-corasick",
"memchr",
- "regex-automata 0.3.8",
+ "regex-automata 0.3.9",
"regex-syntax 0.7.5",
]
@@ -1050,9 +1039,9 @@ dependencies = [
[[package]]
name = "regex-automata"
-version = "0.3.8"
+version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795"
+checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9"
dependencies = [
"aho-corasick",
"memchr",
@@ -1101,9 +1090,9 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
-version = "0.38.14"
+version = "0.38.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f"
+checksum = "f25469e9ae0f3d0047ca8b93fc56843f38e6774f0914a107ff8b41be8be8e0b7"
dependencies = [
"bitflags 2.4.0",
"errno",
@@ -1153,7 +1142,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -1178,9 +1167,9 @@ dependencies = [
[[package]]
name = "sharded-slab"
-version = "0.1.4"
+version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
+checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
@@ -1313,9 +1302,9 @@ dependencies = [
[[package]]
name = "syn"
-version = "2.0.37"
+version = "2.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8"
+checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b"
dependencies = [
"proc-macro2",
"quote",
@@ -1346,22 +1335,22 @@ dependencies = [
[[package]]
name = "thiserror"
-version = "1.0.48"
+version = "1.0.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7"
+checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.48"
+version = "1.0.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35"
+checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -1401,7 +1390,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -1458,7 +1447,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.37",
+ "syn 2.0.38",
]
[[package]]
@@ -1638,9 +1627,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "winnow"
-version = "0.5.15"
+version = "0.5.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc"
+checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907"
dependencies = [
"memchr",
]
diff --git a/README.md b/README.md
index 58d3a50a..b83a7a64 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
[![Proc tests status](https://github.com/josehu07/summerset/actions/workflows/tests_proc.yml/badge.svg)](https://github.com/josehu07/summerset/actions?query=josehu07%3Atests_proc)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
-Summerset is a distributed key-value store supporting a wide range of state machine replication (SMR) protocols for research purposes. More protocols are actively being added.
+Summerset is a distributed, replicated, protocol-generic key-value store supporting a wide range of state machine replication (SMR) protocols for research purposes. More protocols are actively being added.
@@ -21,6 +21,7 @@ Summerset is a distributed key-value store supporting a wide range of state mach
| `SimplePush` | Pushing to peers w/o any consistency guarantees |
| `MultiPaxos` | Classic [MultiPaxos](https://www.microsoft.com/en-us/research/uploads/prod/2016/12/paxos-simple-Copy.pdf) protocol |
| `RS-Paxos` | MultiPaxos w/ Reed-Solomon erasure code sharding |
+| `Raft` | [Raft](https://raft.github.io/raft.pdf) on explicit log and strong leadership |
Formal TLA+ specification of some protocols are provided in `tla+/`.
@@ -32,6 +33,7 @@ Formal TLA+ specification of some protocols are provided in `tla+/`.
- **Async Rust**: Summerset is written in Rust and demonstrates canonical usage of async programming structures backed by the [`tokio`](https://tokio.rs/) framework;
- **Event-based**: Summerset adopts a channel-oriented, event-based system architecture; each replication protocol is basically just a set of event handlers plus a `tokio::select!` loop;
- **Modularized**: Common components of a distributed KV store, e.g. network transport and durable logger, are cleanly separated from each other and connected through channels.
+- **Protocol-generic**: With the above two points combined, Summerset is able to support a set of different replication protocols in one codebase, each being just a single file, with common functionalities abstracted out.
These design choices make protocol implementation in Summerset surprisingly straight-forward and **understandable**, without any sacrifice on performance. Comments / issues / PRs are always welcome!
@@ -118,12 +120,15 @@ Complete cluster management and benchmarking scripts are available in another re
- [ ] specialize read-only commands?
- [ ] separate commit vs. exec responses?
- [ ] membership discovery & view changes?
-- [ ] implementation of Raft
+- [x] implementation of Raft
+ - [x] state persistence & restart check
+ - [x] snapshotting & garbage collection
+ - [ ] membership discovery & view changes?
- [x] client-side utilities
- [x] REPL-style client
- [x] random benchmarking client
- [x] testing client
- - [ ] YCSB-driven benchmarking
+ - [ ] YCSB-driven client
- [ ] better README & documentation
---
diff --git a/scripts/local_cluster.py b/scripts/local_cluster.py
index 72234864..ae356fa0 100644
--- a/scripts/local_cluster.py
+++ b/scripts/local_cluster.py
@@ -44,11 +44,13 @@ def kill_all_matching(name, force=False):
"RepNothing": lambda r: f"backer_path='/tmp/summerset.rep_nothing.{r}.wal'",
"SimplePush": lambda r: f"backer_path='/tmp/summerset.simple_push.{r}.wal'",
"MultiPaxos": lambda r: f"backer_path='/tmp/summerset.multipaxos.{r}.wal'",
+ "Raft": lambda r: f"backer_path='/tmp/summerset.raft.{r}.wal'",
"RSPaxos": lambda r: f"backer_path='/tmp/summerset.rs_paxos.{r}.wal'",
}
PROTOCOL_SNAPSHOT_PATH = {
"MultiPaxos": lambda r: f"snapshot_path='/tmp/summerset.multipaxos.{r}.snap'",
+ "Raft": lambda r: f"snapshot_path='/tmp/summerset.raft.{r}.snap'",
"RSPaxos": lambda r: f"snapshot_path='/tmp/summerset.rs_paxos.{r}.snap'",
}
@@ -70,19 +72,6 @@ def config_with_file_paths(protocol, config, replica):
return result_config
-def config_with_backer_path(protocol, config, replica):
- result_config = PROTOCOL_BACKER_PATH[protocol](replica)
-
- if config is not None and len(config) > 0:
- if "backer_path" in config:
- result_config = config # use user-supplied path
- else:
- result_config += "+"
- result_config += config
-
- return result_config
-
-
def compose_manager_cmd(protocol, srv_port, cli_port, num_replicas, release):
cmd = [f"./target/{'release' if release else 'debug'}/summerset_manager"]
cmd += [
diff --git a/scripts/workflow_test.py b/scripts/workflow_test.py
index 33484aca..eb176a7f 100644
--- a/scripts/workflow_test.py
+++ b/scripts/workflow_test.py
@@ -1,5 +1,6 @@
import sys
import os
+import argparse
import subprocess
@@ -76,6 +77,12 @@ def run_tester_client(protocol, test_name):
if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "-p", "--protocol", type=str, required=True, help="protocol name"
+ )
+ args = parser.parse_args()
+
do_cargo_build()
kill_all_matching("local_client.py", force=True)
@@ -85,6 +92,13 @@ def run_tester_client(protocol, test_name):
kill_all_matching("summerset_manager", force=True)
PROTOCOL = "MultiPaxos"
+ if args.protocol == "MultiPaxos":
+ pass
+ elif args.protocol == "Raft":
+ PROTOCOL = "Raft"
+ else:
+ raise ValueError(f"unrecognized protocol {args.protocol} to run workflow test")
+
NUM_REPLICAS = 3
TEST_NAME = "primitive_ops"
TIMEOUT = 300
diff --git a/src/lib.rs b/src/lib.rs
index 9e044072..feb47c90 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -35,4 +35,5 @@ pub use crate::protocols::SmrProtocol;
pub use crate::protocols::{ReplicaConfigRepNothing, ClientConfigRepNothing};
pub use crate::protocols::{ReplicaConfigSimplePush, ClientConfigSimplePush};
pub use crate::protocols::{ReplicaConfigMultiPaxos, ClientConfigMultiPaxos};
+pub use crate::protocols::{ReplicaConfigRaft, ClientConfigRaft};
pub use crate::protocols::{ReplicaConfigRSPaxos, ClientConfigRSPaxos};
diff --git a/src/manager/clusman.rs b/src/manager/clusman.rs
index a21ef9c7..daefadf0 100644
--- a/src/manager/clusman.rs
+++ b/src/manager/clusman.rs
@@ -186,19 +186,12 @@ impl ClusterManager {
protocol);
}
- // tell it to connect to all existing known servers
+ // gather the list of all existing known servers
let to_peers: HashMap = self
.server_info
.iter()
.map(|(&server, info)| (server, info.p2p_addr))
.collect();
- self.server_reigner.send_ctrl(
- CtrlMsg::ConnectToPeers {
- population: self.population,
- to_peers,
- },
- server,
- )?;
// save new server's info
self.server_info.insert(
@@ -211,6 +204,16 @@ impl ClusterManager {
start_slot: 0,
},
);
+
+ // tell it to connect to all other existing known servers
+ self.server_reigner.send_ctrl(
+ CtrlMsg::ConnectToPeers {
+ population: self.population,
+ to_peers,
+ },
+ server,
+ )?;
+
Ok(())
}
@@ -406,9 +409,13 @@ impl ClusterManager {
self.server_info.get_mut(&s).unwrap().is_paused = true;
// wait for dummy reply
- let (_, reply) = self.server_reigner.recv_ctrl().await?;
- if reply != CtrlMsg::PauseReply {
- return logged_err!("m"; "unexpected reply type received");
+ loop {
+ let (server, reply) = self.server_reigner.recv_ctrl().await?;
+ if server != s || reply != CtrlMsg::PauseReply {
+ self.handle_ctrl_msg(server, reply).await?;
+ } else {
+ break;
+ }
}
pause_done.insert(s);
@@ -442,9 +449,13 @@ impl ClusterManager {
self.server_reigner.send_ctrl(CtrlMsg::Resume, s)?;
// wait for dummy reply
- let (_, reply) = self.server_reigner.recv_ctrl().await?;
- if reply != CtrlMsg::ResumeReply {
- return logged_err!("m"; "unexpected reply type received");
+ loop {
+ let (server, reply) = self.server_reigner.recv_ctrl().await?;
+ if server != s || reply != CtrlMsg::ResumeReply {
+ self.handle_ctrl_msg(server, reply).await?;
+ } else {
+ break;
+ }
}
// clear the is_paused flag
@@ -482,22 +493,27 @@ impl ClusterManager {
self.server_reigner.send_ctrl(CtrlMsg::TakeSnapshot, s)?;
// wait for reply
- let (_, reply) = self.server_reigner.recv_ctrl().await?;
- if let CtrlMsg::SnapshotUpTo { new_start } = reply {
- // update the log start index info
- assert!(self.server_info.contains_key(&s));
- if new_start < self.server_info[&s].start_slot {
- return logged_err!("m"; "server {} snapshot up to {} < {}",
- s, new_start,
- self.server_info[&s].start_slot);
- } else {
- self.server_info.get_mut(&s).unwrap().start_slot =
- new_start;
- }
+ loop {
+ let (server, reply) = self.server_reigner.recv_ctrl().await?;
+ match reply {
+ CtrlMsg::SnapshotUpTo { new_start } if server == s => {
+ // update the log start index info
+ assert!(self.server_info.contains_key(&s));
+ if new_start < self.server_info[&s].start_slot {
+ return logged_err!("m"; "server {} snapshot up to {} < {}",
+ s, new_start,
+ self.server_info[&s].start_slot);
+ } else {
+ self.server_info.get_mut(&s).unwrap().start_slot =
+ new_start;
+ }
+
+ snapshot_up_to.insert(s, new_start);
+ break;
+ }
- snapshot_up_to.insert(s, new_start);
- } else {
- return logged_err!("m"; "unexpected reply type received");
+ _ => self.handle_ctrl_msg(server, reply).await?,
+ }
}
}
diff --git a/src/manager/reigner.rs b/src/manager/reigner.rs
index 41ae38ec..3be28cde 100644
--- a/src/manager/reigner.rs
+++ b/src/manager/reigner.rs
@@ -21,7 +21,7 @@ use tokio::task::JoinHandle;
/// Control message from/to servers. Control traffic could be bidirectional:
/// some initiated by the manager and some by servers.
-// TODO: later add leader change, membership change, etc.
+// TODO: later add membership/view change, link drop, etc.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum CtrlMsg {
/// Server -> Manager: new server up, requesting a list of peers' addresses
diff --git a/src/protocols/mod.rs b/src/protocols/mod.rs
index b7aaaf4f..ff7f88f1 100644
--- a/src/protocols/mod.rs
+++ b/src/protocols/mod.rs
@@ -22,6 +22,10 @@ mod multipaxos;
use multipaxos::{MultiPaxosReplica, MultiPaxosClient};
pub use multipaxos::{ReplicaConfigMultiPaxos, ClientConfigMultiPaxos};
+mod raft;
+use raft::{RaftReplica, RaftClient};
+pub use raft::{ReplicaConfigRaft, ClientConfigRaft};
+
mod rs_paxos;
use rs_paxos::{RSPaxosReplica, RSPaxosClient};
pub use rs_paxos::{ReplicaConfigRSPaxos, ClientConfigRSPaxos};
@@ -32,6 +36,7 @@ pub enum SmrProtocol {
RepNothing,
SimplePush,
MultiPaxos,
+ Raft,
RSPaxos,
}
@@ -51,6 +56,7 @@ impl SmrProtocol {
"RepNothing" => Some(Self::RepNothing),
"SimplePush" => Some(Self::SimplePush),
"MultiPaxos" => Some(Self::MultiPaxos),
+ "Raft" => Some(Self::Raft),
"RSPaxos" => Some(Self::RSPaxos),
_ => None,
}
@@ -100,6 +106,14 @@ impl SmrProtocol {
.await
)
}
+ Self::Raft => {
+ box_if_ok!(
+ RaftReplica::new_and_setup(
+ api_addr, p2p_addr, manager, config_str
+ )
+ .await
+ )
+ }
Self::RSPaxos => {
box_if_ok!(
RSPaxosReplica::new_and_setup(
@@ -133,6 +147,9 @@ impl SmrProtocol {
MultiPaxosClient::new_and_setup(manager, config_str).await
)
}
+ Self::Raft => {
+ box_if_ok!(RaftClient::new_and_setup(manager, config_str).await)
+ }
Self::RSPaxos => {
box_if_ok!(
RSPaxosClient::new_and_setup(manager, config_str).await
@@ -166,6 +183,7 @@ mod protocols_name_tests {
valid_name_test!(RepNothing);
valid_name_test!(SimplePush);
valid_name_test!(MultiPaxos);
+ valid_name_test!(Raft);
valid_name_test!(RSPaxos);
}
diff --git a/src/protocols/multipaxos.rs b/src/protocols/multipaxos.rs
index 00e5f964..f52ca472 100644
--- a/src/protocols/multipaxos.rs
+++ b/src/protocols/multipaxos.rs
@@ -7,6 +7,7 @@
//! -
//! -
+use std::cmp;
use std::collections::HashMap;
use std::path::Path;
use std::net::SocketAddr;
@@ -35,8 +36,8 @@ use tokio::sync::watch;
/// Configuration parameters struct.
#[derive(Debug, Deserialize)]
pub struct ReplicaConfigMultiPaxos {
- /// Client request batching interval in microsecs.
- pub batch_interval_us: u64,
+ /// Client request batching interval in millisecs.
+ pub batch_interval_ms: u64,
/// Client request batching maximum batch size.
pub max_batch_size: usize,
@@ -49,7 +50,6 @@ pub struct ReplicaConfigMultiPaxos {
/// Min timeout of not hearing any heartbeat from leader in millisecs.
pub hb_hear_timeout_min: u64,
-
/// Max timeout of not hearing any heartbeat from leader in millisecs.
pub hb_hear_timeout_max: u64,
@@ -74,7 +74,7 @@ pub struct ReplicaConfigMultiPaxos {
impl Default for ReplicaConfigMultiPaxos {
fn default() -> Self {
ReplicaConfigMultiPaxos {
- batch_interval_us: 1000,
+ batch_interval_ms: 10,
max_batch_size: 5000,
backer_path: "/tmp/summerset.multipaxos.wal".into(),
logger_sync: false,
@@ -154,12 +154,12 @@ struct Instance {
external: bool,
/// Offset of first durable WAL log entry related to this instance.
- log_offset: usize,
+ wal_offset: usize,
}
-/// Stable storage log entry type.
+/// Stable storage WAL log entry type.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, GetSize)]
-enum LogEntry {
+enum WalEntry {
/// Records an update to the largest prepare ballot seen.
PrepareBal { slot: usize, ballot: Ballot },
@@ -175,11 +175,20 @@ enum LogEntry {
}
/// Snapshot file entry type.
+///
+/// NOTE: the current implementation simply appends a squashed log at the
+/// end of the snapshot file for simplicity. In production, the snapshot
+/// file should be a bounded-sized backend, e.g., an LSM-tree.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, GetSize)]
enum SnapEntry {
- /// First entry at the start of file: number of log instances covered by
- /// this snapshot file == the start slot index of in-mem log.
- StartSlot { slot: usize },
+ /// Necessary slot indices to remember.
+ SlotInfo {
+ /// First entry at the start of file: number of log instances covered
+ /// by this snapshot file == the start slot index of in-mem log.
+ start_slot: usize,
+ /// Index of the first non-committed slot.
+ commit_bar: usize,
+ },
/// Set of key-value pairs to apply to the state.
KVPairSet { pairs: HashMap },
@@ -189,7 +198,13 @@ enum SnapEntry {
#[derive(Debug, Clone, Serialize, Deserialize, GetSize)]
enum PeerMsg {
/// Prepare message from leader to replicas.
- Prepare { slot: usize, ballot: Ballot },
+ Prepare {
+ /// Slot index in Prepare message is the triggering slot of this
+ /// Prepare. Once prepared, it means that all slots in the range
+ /// [slot, +infinity) are prepared under this ballot number.
+ slot: usize,
+ ballot: Ballot,
+ },
/// Prepare reply from replica to leader.
PrepareReply {
@@ -213,8 +228,18 @@ enum PeerMsg {
/// Commit notification from leader to replicas.
Commit { slot: usize },
+ /// Request by a lagging replica to leader asking to re-send Accepts for
+ /// missing holes
+ FillHoles { slots: Vec },
+
/// Leader activity heartbeat.
- Heartbeat { ballot: Ballot, exec_bar: usize },
+ Heartbeat {
+ ballot: Ballot,
+ /// For leader step-up as well as conservative snapshotting purpose.
+ exec_bar: usize,
+ /// For conservative snapshotting purpose.
+ snap_bar: usize,
+ },
}
/// MultiPaxos server replica module.
@@ -247,7 +272,7 @@ pub struct MultiPaxosReplica {
state_machine: StateMachine,
/// StorageHub module.
- storage_hub: StorageHub,
+ storage_hub: StorageHub,
/// StorageHub module for the snapshot file.
snapshot_hub: StorageHub,
@@ -255,14 +280,21 @@ pub struct MultiPaxosReplica {
/// TransportHub module.
transport_hub: TransportHub,
+ /// Who do I think is the effective leader of the cluster right now?
+ leader: Option,
+
/// Timer for hearing heartbeat from leader.
hb_hear_timer: Timer,
/// Interval for sending heartbeat to followers.
hb_send_interval: Interval,
- /// Do I think I am the leader?
- is_leader: bool,
+ /// Heartbeat reply counters for approximate detection of follower health.
+ /// Tuple of (#hb_replied, #hb_replied seen at last send, repetition).
+ hb_reply_cnts: HashMap,
+
+ /// Approximate health status tracking of peer replicas.
+ peer_alive: Bitmap,
/// In-memory log of instances.
insts: Vec,
@@ -289,15 +321,33 @@ pub struct MultiPaxosReplica {
/// It is always true that exec_bar <= commit_bar <= start_slot + insts.len()
exec_bar: usize,
- /// Current durable log file offset.
- log_offset: usize,
+ /// Map from peer ID -> its latest exec_bar I know; this is for conservative
+ /// snapshotting purpose.
+ peer_exec_bar: HashMap,
+
+ /// Slot index before which it is safe to take snapshot.
+ /// NOTE: we are taking a conservative approach here that a snapshot
+ /// covering an entry can be taken only when all servers have durably
+ /// committed (and executed) that entry.
+ snap_bar: usize,
+
+ /// Current durable WAL log file offset.
+ wal_offset: usize,
/// Current durable snapshot file offset.
snap_offset: usize,
}
+// MultiPaxosReplica common helpers
impl MultiPaxosReplica {
+ /// Do I think I am the current effective leader?
+ #[inline]
+ fn is_leader(&self) -> bool {
+ self.leader == Some(self.id)
+ }
+
/// Create an empty null instance.
+ #[inline]
fn null_instance(&self) -> Instance {
Instance {
bal: 0,
@@ -307,22 +357,36 @@ impl MultiPaxosReplica {
leader_bk: None,
replica_bk: None,
external: false,
- log_offset: 0,
+ wal_offset: 0,
+ }
+ }
+
+ /// Locate the first null slot or append a null instance if no holes exist.
+ fn first_null_slot(&mut self) -> usize {
+ for s in self.commit_bar..(self.start_slot + self.insts.len()) {
+ if self.insts[s - self.start_slot].status == Status::Null {
+ return s;
+ }
}
+ self.insts.push(self.null_instance());
+ self.start_slot + self.insts.len() - 1
}
/// Compose a unique ballot number from base.
+ #[inline]
fn make_unique_ballot(&self, base: u64) -> Ballot {
((base << 8) | ((self.id + 1) as u64)) as Ballot
}
/// Compose a unique ballot number greater than the given one.
+ #[inline]
fn make_greater_ballot(&self, bal: Ballot) -> Ballot {
self.make_unique_ballot((bal >> 8) + 1)
}
/// Compose LogActionId from slot index & entry type.
/// Uses the `Status` enum type to represent differnet entry types.
+ #[inline]
fn make_log_action_id(slot: usize, entry_type: Status) -> LogActionId {
let type_num = match entry_type {
Status::Preparing => 1,
@@ -334,6 +398,7 @@ impl MultiPaxosReplica {
}
/// Decompose LogActionId into slot index & entry type.
+ #[inline]
fn split_log_action_id(log_action_id: LogActionId) -> (usize, Status) {
let slot = (log_action_id >> 2) as usize;
let type_num = log_action_id & ((1 << 2) - 1);
@@ -347,6 +412,7 @@ impl MultiPaxosReplica {
}
/// Compose CommandId from slot index & command index within.
+ #[inline]
fn make_command_id(slot: usize, cmd_idx: usize) -> CommandId {
assert!(slot <= (u32::MAX as usize));
assert!(cmd_idx <= (u32::MAX as usize));
@@ -354,12 +420,16 @@ impl MultiPaxosReplica {
}
/// Decompose CommandId into slot index & command index within.
+ #[inline]
fn split_command_id(command_id: CommandId) -> (usize, usize) {
let slot = (command_id >> 32) as usize;
let cmd_idx = (command_id & ((1 << 32) - 1)) as usize;
(slot, cmd_idx)
}
+}
+// MultiPaxosReplica client requests entrance
+impl MultiPaxosReplica {
/// Handler of client request batch chan recv.
fn handle_req_batch(
&mut self,
@@ -370,52 +440,44 @@ impl MultiPaxosReplica {
pf_debug!(self.id; "got request batch of size {}", batch_size);
// if I'm not a leader, ignore client requests
- if !self.is_leader {
+ if !self.is_leader() {
for (client, req) in req_batch {
if let ApiRequest::Req { id: req_id, .. } = req {
- // tell the client to try on the next replica
- let next_replica = (self.id + 1) % self.population;
+ // tell the client to try on known leader or just the
+ // next ID replica
+ let target = if let Some(peer) = self.leader {
+ peer
+ } else {
+ (self.id + 1) % self.population
+ };
self.external_api.send_reply(
ApiReply::Reply {
id: req_id,
result: None,
- redirect: Some(next_replica),
+ redirect: Some(target),
},
client,
)?;
pf_trace!(self.id; "redirected client {} to replica {}",
- client, next_replica);
+ client, target);
}
}
return Ok(());
}
// create a new instance in the first null slot (or append a new one
- // at the end if no holes exist)
- let mut slot = self.start_slot + self.insts.len();
- for s in self.commit_bar..(self.start_slot + self.insts.len()) {
- let old_inst = &mut self.insts[s - self.start_slot];
- if old_inst.status == Status::Null {
- old_inst.reqs = req_batch.clone();
- old_inst.leader_bk = Some(LeaderBookkeeping {
- prepare_acks: Bitmap::new(self.population, false),
- prepare_max_bal: 0,
- accept_acks: Bitmap::new(self.population, false),
- });
- slot = s;
- break;
- }
- }
- if slot == self.start_slot + self.insts.len() {
- let mut new_inst = self.null_instance();
- new_inst.reqs = req_batch.clone();
- new_inst.leader_bk = Some(LeaderBookkeeping {
+ // at the end if no holes exist); fill it up with incoming data
+ let slot = self.first_null_slot();
+ {
+ let inst = &mut self.insts[slot - self.start_slot];
+ assert_eq!(inst.status, Status::Null);
+ inst.reqs = req_batch.clone();
+ inst.leader_bk = Some(LeaderBookkeeping {
prepare_acks: Bitmap::new(self.population, false),
prepare_max_bal: 0,
accept_acks: Bitmap::new(self.population, false),
});
- new_inst.external = true;
- self.insts.push(new_inst);
+ inst.external = true;
}
// decide whether we can enter fast path for this instance
@@ -439,7 +501,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
Self::make_log_action_id(slot, Status::Preparing),
LogAction::Append {
- entry: LogEntry::PrepareBal {
+ entry: WalEntry::PrepareBal {
slot,
ballot: self.bal_prep_sent,
},
@@ -472,7 +534,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
Self::make_log_action_id(slot, Status::Accepting),
LogAction::Append {
- entry: LogEntry::AcceptData {
+ entry: WalEntry::AcceptData {
slot,
ballot: inst.bal,
reqs: req_batch.clone(),
@@ -498,7 +560,10 @@ impl MultiPaxosReplica {
Ok(())
}
+}
+// MultiPaxosReplica durable WAL logging
+impl MultiPaxosReplica {
/// Handler of PrepareBal logging result chan recv.
fn handle_logged_prepare_bal(
&mut self,
@@ -516,7 +581,7 @@ impl MultiPaxosReplica {
None
};
- if self.is_leader {
+ if self.is_leader() {
// on leader, finishing the logging of a PrepareBal entry
// is equivalent to receiving a Prepare reply from myself
// (as an acceptor role)
@@ -553,7 +618,7 @@ impl MultiPaxosReplica {
slot, self.insts[slot - self.start_slot].bal);
let inst = &self.insts[slot - self.start_slot];
- if self.is_leader {
+ if self.is_leader() {
// on leader, finishing the logging of an AcceptData entry
// is equivalent to receiving an Accept reply from myself
// (as an acceptor role)
@@ -619,6 +684,21 @@ impl MultiPaxosReplica {
}
}
+ // if there are hole(s) between current commit_bar and newly committed
+ // slot, ask the leader to re-send Accept messages for those slots
+ if slot > self.commit_bar && !self.is_leader() {
+ if let Some(leader) = self.leader {
+ let holes: Vec = (self.commit_bar..slot).collect();
+ self.transport_hub.send_msg(
+ PeerMsg::FillHoles {
+ slots: holes.clone(),
+ },
+ leader,
+ )?;
+ pf_trace!(self.id; "sent FillHoles -> {} slots {:?}", leader, holes);
+ }
+ }
+
Ok(())
}
@@ -626,7 +706,7 @@ impl MultiPaxosReplica {
fn handle_log_result(
&mut self,
action_id: LogActionId,
- log_result: LogResult,
+ log_result: LogResult,
) -> Result<(), SummersetError> {
let (slot, entry_type) = Self::split_log_action_id(action_id);
if slot < self.start_slot {
@@ -635,15 +715,15 @@ impl MultiPaxosReplica {
assert!(slot < self.start_slot + self.insts.len());
if let LogResult::Append { now_size } = log_result {
- assert!(now_size >= self.log_offset);
- // update first log_offset of slot
+ assert!(now_size >= self.wal_offset);
+ // update first wal_offset of slot
let inst = &mut self.insts[slot - self.start_slot];
- if inst.log_offset == 0 || inst.log_offset > self.log_offset {
- inst.log_offset = self.log_offset;
+ if inst.wal_offset == 0 || inst.wal_offset > self.wal_offset {
+ inst.wal_offset = self.wal_offset;
}
- assert!(inst.log_offset <= self.log_offset);
- // then update self.log_offset
- self.log_offset = now_size;
+ assert!(inst.wal_offset <= self.wal_offset);
+ // then update self.wal_offset
+ self.wal_offset = now_size;
} else {
return logged_err!(self.id; "unexpected log result type: {:?}", log_result);
}
@@ -657,7 +737,10 @@ impl MultiPaxosReplica {
}
}
}
+}
+// MultiPaxosReplica peer-peer messages handling
+impl MultiPaxosReplica {
/// Handler of Prepare message from leader.
fn handle_msg_prepare(
&mut self,
@@ -691,7 +774,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
Self::make_log_action_id(slot, Status::Preparing),
LogAction::Append {
- entry: LogEntry::PrepareBal { slot, ballot },
+ entry: WalEntry::PrepareBal { slot, ballot },
sync: self.config.logger_sync,
},
)?;
@@ -719,10 +802,11 @@ impl MultiPaxosReplica {
// if ballot is what I'm currently waiting on for Prepare replies:
if ballot == self.bal_prep_sent {
assert!(slot < self.start_slot + self.insts.len());
+ let is_leader = self.is_leader();
let inst = &mut self.insts[slot - self.start_slot];
// ignore spurious duplications and outdated replies
- if !self.is_leader
+ if !is_leader
|| (inst.status != Status::Preparing)
|| (ballot < inst.bal)
{
@@ -761,7 +845,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
Self::make_log_action_id(slot, Status::Accepting),
LogAction::Append {
- entry: LogEntry::AcceptData {
+ entry: WalEntry::AcceptData {
slot,
ballot,
reqs: inst.reqs.clone(),
@@ -825,7 +909,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
Self::make_log_action_id(slot, Status::Accepting),
LogAction::Append {
- entry: LogEntry::AcceptData { slot, ballot, reqs },
+ entry: WalEntry::AcceptData { slot, ballot, reqs },
sync: self.config.logger_sync,
},
)?;
@@ -852,10 +936,11 @@ impl MultiPaxosReplica {
// if ballot is what I'm currently waiting on for Accept replies:
if ballot == self.bal_prepared {
assert!(slot < self.start_slot + self.insts.len());
+ let is_leader = self.is_leader();
let inst = &mut self.insts[slot - self.start_slot];
// ignore spurious duplications and outdated replies
- if !self.is_leader
+ if !is_leader
|| (inst.status != Status::Accepting)
|| (ballot < inst.bal)
{
@@ -882,7 +967,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
Self::make_log_action_id(slot, Status::Committed),
LogAction::Append {
- entry: LogEntry::CommitSlot { slot },
+ entry: WalEntry::CommitSlot { slot },
sync: self.config.logger_sync,
},
)?;
@@ -931,7 +1016,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
Self::make_log_action_id(slot, Status::Committed),
LogAction::Append {
- entry: LogEntry::CommitSlot { slot },
+ entry: WalEntry::CommitSlot { slot },
sync: self.config.logger_sync,
},
)?;
@@ -941,6 +1026,43 @@ impl MultiPaxosReplica {
Ok(())
}
+ /// Handler of FillHoles message from a lagging peer.
+ fn handle_msg_fill_holes(
+ &mut self,
+ peer: ReplicaId,
+ slots: Vec,
+ ) -> Result<(), SummersetError> {
+ if !self.is_leader() {
+ return Ok(());
+ }
+ pf_trace!(self.id; "received FillHoles <- {} for slots {:?}", peer, slots);
+
+ for slot in slots {
+ if slot < self.start_slot {
+ continue;
+ } else if slot >= self.start_slot + self.insts.len() {
+ break;
+ }
+ let inst = &self.insts[slot - self.start_slot];
+
+ if inst.status >= Status::Committed {
+ // re-send Accept message for this slot
+ self.transport_hub.send_msg(
+ PeerMsg::Accept {
+ slot,
+ ballot: self.bal_prepared,
+ reqs: inst.reqs.clone(),
+ },
+ peer,
+ )?;
+ pf_trace!(self.id; "sent Accept -> {} for slot {} bal {}",
+ peer, slot, self.bal_prepared);
+ }
+ }
+
+ Ok(())
+ }
+
/// Synthesized handler of receiving message from peer.
fn handle_msg_recv(
&mut self,
@@ -963,12 +1085,20 @@ impl MultiPaxosReplica {
self.handle_msg_accept_reply(peer, slot, ballot)
}
PeerMsg::Commit { slot } => self.handle_msg_commit(peer, slot),
- PeerMsg::Heartbeat { ballot, exec_bar } => {
- self.heard_heartbeat(peer, ballot, exec_bar)
+ PeerMsg::FillHoles { slots } => {
+ self.handle_msg_fill_holes(peer, slots)
}
+ PeerMsg::Heartbeat {
+ ballot,
+ exec_bar,
+ snap_bar,
+ } => self.heard_heartbeat(peer, ballot, exec_bar, snap_bar),
}
}
+}
+// MultiPaxosReplica state machine execution
+impl MultiPaxosReplica {
/// Handler of state machine exec result chan recv.
fn handle_cmd_result(
&mut self,
@@ -1026,22 +1156,39 @@ impl MultiPaxosReplica {
Ok(())
}
+}
+// MultiPaxosReplica leadership related logic
+impl MultiPaxosReplica {
/// Becomes a leader, sends self-initiated Prepare messages to followers
/// for all in-progress instances, and starts broadcasting heartbeats.
fn become_a_leader(&mut self) -> Result<(), SummersetError> {
- if self.is_leader {
+ if self.is_leader() {
return Ok(());
+ } else if let Some(peer) = self.leader {
+ // mark old leader as dead
+ if self.peer_alive.get(peer)? {
+ self.peer_alive.set(peer, false)?;
+ pf_debug!(self.id; "peer_alive updated: {:?}", self.peer_alive);
+ }
}
- self.is_leader = true; // this starts broadcasting heartbeats
+ self.leader = Some(self.id); // this starts broadcasting heartbeats
self.control_hub
.send_ctrl(CtrlMsg::LeaderStatus { step_up: true })?;
pf_info!(self.id; "becoming a leader...");
- // broadcast a heartbeat right now
+ // clear peers' heartbeat reply counters, and broadcast a heartbeat now
+ for cnts in self.hb_reply_cnts.values_mut() {
+ *cnts = (1, 0, 0);
+ }
self.bcast_heartbeats()?;
+ // re-initialize peer_exec_bar information
+ for slot in self.peer_exec_bar.values_mut() {
+ *slot = 0;
+ }
+
// make a greater ballot number and invalidate all in-progress instances
self.bal_prepared = 0;
self.bal_prep_sent = self.make_greater_ballot(self.bal_max_seen);
@@ -1069,7 +1216,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
Self::make_log_action_id(slot, Status::Preparing),
LogAction::Append {
- entry: LogEntry::PrepareBal {
+ entry: WalEntry::PrepareBal {
slot,
ballot: self.bal_prep_sent,
},
@@ -1091,7 +1238,6 @@ impl MultiPaxosReplica {
slot, inst.bal);
}
}
-
Ok(())
}
@@ -1101,10 +1247,43 @@ impl MultiPaxosReplica {
PeerMsg::Heartbeat {
ballot: self.bal_prep_sent,
exec_bar: self.exec_bar,
+ snap_bar: self.snap_bar,
},
None,
)?;
- self.heard_heartbeat(self.id, self.bal_prep_sent, self.exec_bar)?;
+
+ // update max heartbeat reply counters and their repetitions seen
+ for (&peer, cnts) in self.hb_reply_cnts.iter_mut() {
+ if cnts.0 > cnts.1 {
+ // more hb replies have been received from this peer; it is
+ // probably alive
+ cnts.1 = cnts.0;
+ cnts.2 = 0;
+ } else {
+ // did not receive hb reply from this peer at least for the
+ // last sent hb from me; increment repetition count
+ cnts.2 += 1;
+ let repeat_threshold = (self.config.hb_hear_timeout_min
+ / self.config.hb_send_interval_ms)
+ as u8;
+ if cnts.2 > repeat_threshold {
+ // did not receive hb reply from this peer for too many
+ // past hbs sent from me; this peer is probably dead
+ if self.peer_alive.get(peer)? {
+ self.peer_alive.set(peer, false)?;
+ pf_debug!(self.id; "peer_alive updated: {:?}", self.peer_alive);
+ }
+ }
+ }
+ }
+
+ // I also heard this heartbeat from myself
+ self.heard_heartbeat(
+ self.id,
+ self.bal_prep_sent,
+ self.exec_bar,
+ self.snap_bar,
+ )?;
// pf_trace!(self.id; "broadcast heartbeats bal {}", self.bal_prep_sent);
Ok(())
@@ -1113,6 +1292,8 @@ impl MultiPaxosReplica {
/// Chooses a random hb_hear_timeout from the min-max range and kicks off
/// the hb_hear_timer.
fn kickoff_hb_hear_timer(&mut self) -> Result<(), SummersetError> {
+ self.hb_hear_timer.cancel()?;
+
let timeout_ms = thread_rng().gen_range(
self.config.hb_hear_timeout_min..=self.config.hb_hear_timeout_max,
);
@@ -1128,10 +1309,19 @@ impl MultiPaxosReplica {
/// leader status if I currently think I'm a leader.
fn heard_heartbeat(
&mut self,
- _peer: ReplicaId,
+ peer: ReplicaId,
ballot: Ballot,
exec_bar: usize,
+ snap_bar: usize,
) -> Result<(), SummersetError> {
+ if peer != self.id {
+ self.hb_reply_cnts.get_mut(&peer).unwrap().0 += 1;
+ if !self.peer_alive.get(peer)? {
+ self.peer_alive.set(peer, true)?;
+ pf_debug!(self.id; "peer_alive updated: {:?}", self.peer_alive);
+ }
+ }
+
// ignore outdated heartbeats and those from peers with exec_bar < mine
if ballot < self.bal_max_seen || exec_bar < self.exec_bar {
return Ok(());
@@ -1140,18 +1330,61 @@ impl MultiPaxosReplica {
// reset hearing timer
self.kickoff_hb_hear_timer()?;
- // clear my leader status if it carries a higher ballot number
- if self.is_leader && ballot > self.bal_max_seen {
- self.is_leader = false;
- self.control_hub
- .send_ctrl(CtrlMsg::LeaderStatus { step_up: false })?;
- pf_info!(self.id; "no longer a leader...");
+ if peer != self.id {
+ // reply back with a Heartbeat message
+ self.transport_hub.send_msg(
+ PeerMsg::Heartbeat {
+ ballot,
+ exec_bar: self.exec_bar,
+ snap_bar: self.snap_bar,
+ },
+ peer,
+ )?;
+
+ // update peer_exec_bar if larger then known; if all servers'
+ // exec_bar (including myself) have passed a slot, that slot
+ // is definitely safe to be snapshotted
+ if exec_bar > self.peer_exec_bar[&peer] {
+ *self.peer_exec_bar.get_mut(&peer).unwrap() = exec_bar;
+ let passed_cnt = 1 + self
+ .peer_exec_bar
+ .values()
+ .filter(|&&e| e >= exec_bar)
+ .count() as u8;
+ if passed_cnt == self.population {
+ // all servers have executed up to exec_bar
+ self.snap_bar = exec_bar;
+ }
+ }
+
+ // if the peer has made a higher ballot number
+ if ballot > self.bal_max_seen {
+ self.bal_max_seen = ballot;
+
+ // clear my leader status if I was one
+ if self.is_leader() {
+ self.control_hub
+ .send_ctrl(CtrlMsg::LeaderStatus { step_up: false })?;
+ pf_info!(self.id; "no longer a leader...");
+ }
+
+ // set this peer to be the believed leader
+ self.leader = Some(peer);
+ }
+ }
+
+ // if snap_bar is larger than mine, update snap_bar
+ if snap_bar > self.snap_bar {
+ self.snap_bar = snap_bar;
}
// pf_trace!(self.id; "heard heartbeat <- {} bal {}", peer, ballot);
Ok(())
}
+}
+// MultiPaxosReplica control messages handling
+impl MultiPaxosReplica {
/// Handler of ResetState control message.
async fn handle_ctrl_reset_state(
&mut self,
@@ -1263,14 +1496,20 @@ impl MultiPaxosReplica {
_ => Ok(None), // ignore all other types
}
}
+}
+// MultiPaxosReplica recovery from WAL log
+impl MultiPaxosReplica {
/// Apply a durable storage log entry for recovery.
async fn recover_apply_entry(
&mut self,
- entry: LogEntry,
+ entry: WalEntry,
) -> Result<(), SummersetError> {
match entry {
- LogEntry::PrepareBal { slot, ballot } => {
+ WalEntry::PrepareBal { slot, ballot } => {
+ if slot < self.start_slot {
+ return Ok(()); // ignore if slot index outdated
+ }
// locate instance in memory, filling in null instances if needed
while self.start_slot + self.insts.len() <= slot {
self.insts.push(self.null_instance());
@@ -1289,7 +1528,10 @@ impl MultiPaxosReplica {
self.bal_prepared = 0;
}
- LogEntry::AcceptData { slot, ballot, reqs } => {
+ WalEntry::AcceptData { slot, ballot, reqs } => {
+ if slot < self.start_slot {
+ return Ok(()); // ignore if slot index outdated
+ }
// locate instance in memory, filling in null instances if needed
while self.start_slot + self.insts.len() <= slot {
self.insts.push(self.null_instance());
@@ -1315,9 +1557,12 @@ impl MultiPaxosReplica {
assert!(self.bal_prepared <= self.bal_prep_sent);
}
- LogEntry::CommitSlot { slot } => {
+ WalEntry::CommitSlot { slot } => {
+ if slot < self.start_slot {
+ return Ok(()); // ignore if slot index outdated
+ }
assert!(slot < self.start_slot + self.insts.len());
- // update instance state
+ // update instance status
self.insts[slot - self.start_slot].status = Status::Committed;
// submit commands in contiguously committed instance to the
// state machine
@@ -1337,9 +1582,10 @@ impl MultiPaxosReplica {
let _ = self.state_machine.get_result().await?;
}
}
- // update commit_bar and exec_bar
+ // update instance status, commit_bar and exec_bar
self.commit_bar += 1;
self.exec_bar += 1;
+ inst.status = Status::Executed;
}
}
}
@@ -1348,15 +1594,15 @@ impl MultiPaxosReplica {
Ok(())
}
- /// Recover state from durable storage log.
- async fn recover_from_log(&mut self) -> Result<(), SummersetError> {
- assert_eq!(self.log_offset, 0);
+ /// Recover state from durable storage WAL log.
+ async fn recover_from_wal(&mut self) -> Result<(), SummersetError> {
+ assert_eq!(self.wal_offset, 0);
loop {
// using 0 as a special log action ID
self.storage_hub.submit_action(
0,
LogAction::Read {
- offset: self.log_offset,
+ offset: self.wal_offset,
},
)?;
let (_, log_result) = self.storage_hub.get_result().await?;
@@ -1368,7 +1614,7 @@ impl MultiPaxosReplica {
} => {
self.recover_apply_entry(entry).await?;
// update log offset
- self.log_offset = end_offset;
+ self.wal_offset = end_offset;
}
LogResult::Read { entry: None, .. } => {
// end of log reached
@@ -1384,7 +1630,7 @@ impl MultiPaxosReplica {
self.storage_hub.submit_action(
0,
LogAction::Truncate {
- offset: self.log_offset,
+ offset: self.wal_offset,
},
)?;
let (_, log_result) = self.storage_hub.get_result().await?;
@@ -1392,17 +1638,27 @@ impl MultiPaxosReplica {
offset_ok: true, ..
} = log_result
{
+ if self.wal_offset > 0 {
+ pf_info!(self.id; "recovered from wal log: commit {} exec {}",
+ self.commit_bar, self.exec_bar);
+ }
Ok(())
} else {
logged_err!(self.id; "unexpected log result type or failed truncate")
}
}
+}
- /// Dump a new key-value pair to snapshot file.
- async fn snapshot_dump_kv_pairs(&mut self) -> Result<(), SummersetError> {
+// MultiPaxosReplica snapshotting & GC logic
+impl MultiPaxosReplica {
+ /// Dump new key-value pairs to snapshot file.
+ async fn snapshot_dump_kv_pairs(
+ &mut self,
+ new_start_slot: usize,
+ ) -> Result<(), SummersetError> {
// collect all key-value pairs put up to exec_bar
let mut pairs = HashMap::new();
- for slot in self.start_slot..self.exec_bar {
+ for slot in self.start_slot..new_start_slot {
let inst = &self.insts[slot - self.start_slot];
for (_, req) in inst.reqs.clone() {
if let ApiRequest::Req {
@@ -1438,15 +1694,20 @@ impl MultiPaxosReplica {
/// Discard everything older than start_slot in durable WAL log.
async fn snapshot_discard_log(&mut self) -> Result<(), SummersetError> {
let cut_offset = if !self.insts.is_empty() {
- self.insts[0].log_offset
+ self.insts[0].wal_offset
} else {
- self.log_offset
+ self.wal_offset
};
// discard the log before cut_offset
if cut_offset > 0 {
- self.storage_hub
- .submit_action(0, LogAction::Discard { offset: cut_offset })?;
+ self.storage_hub.submit_action(
+ 0,
+ LogAction::Discard {
+ offset: cut_offset,
+ keep: 0,
+ },
+ )?;
loop {
let (action_id, log_result) =
self.storage_hub.get_result().await?;
@@ -1459,8 +1720,8 @@ impl MultiPaxosReplica {
now_size,
} = log_result
{
- assert_eq!(self.log_offset - cut_offset, now_size);
- self.log_offset = now_size;
+ assert_eq!(self.wal_offset - cut_offset, now_size);
+ self.wal_offset = now_size;
} else {
return logged_err!(
self.id;
@@ -1472,43 +1733,74 @@ impl MultiPaxosReplica {
}
}
- // update inst.log_offset for all remaining in-mem instances
+ // update inst.wal_offset for all remaining in-mem instances
for inst in &mut self.insts {
- if inst.log_offset > 0 {
- assert!(inst.log_offset >= cut_offset);
- inst.log_offset -= cut_offset;
+ if inst.wal_offset > 0 {
+ assert!(inst.wal_offset >= cut_offset);
+ inst.wal_offset -= cut_offset;
}
}
Ok(())
}
- /// Take a snapshot up to current exec_idx, then discard the in-mem log up
+ /// Take a snapshot up to current exec_bar, then discard the in-mem log up
/// to that index as well as outdate entries in the durable WAL log file.
///
/// NOTE: the current implementation does not guard against crashes in the
- /// middle of taking a snapshot.
+ /// middle of taking a snapshot. Production quality implementations should
+ /// make the snapshotting action "atomic".
+ ///
+ /// NOTE: the current implementation does not take care of InstallSnapshot
+ /// messages (which is needed when some lagging follower has some slot
+ /// which all other peers have snapshotted); we assume here that failed
+ /// Accept messages will be retried indefinitely until success before its
+ /// associated data gets discarded from leader's memory.
async fn take_new_snapshot(&mut self) -> Result<(), SummersetError> {
- pf_debug!(self.id; "taking new snapshot: start {} exec {}",
- self.start_slot, self.exec_bar);
+ pf_debug!(self.id; "taking new snapshot: start {} exec {} snap {}",
+ self.start_slot, self.exec_bar, self.snap_bar);
assert!(self.exec_bar >= self.start_slot);
- if self.exec_bar == self.start_slot {
+
+ let new_start_slot = cmp::min(self.snap_bar, self.exec_bar);
+ if new_start_slot == self.start_slot {
return Ok(());
}
// collect and dump all Puts in executed instances
- if self.is_leader {
+ if self.is_leader() {
// NOTE: broadcast heartbeats here to appease followers
self.bcast_heartbeats()?;
}
- self.snapshot_dump_kv_pairs().await?;
+ self.snapshot_dump_kv_pairs(new_start_slot).await?;
+
+ // write new slot info entry to the head of snapshot
+ self.snapshot_hub.submit_action(
+ 0,
+ LogAction::Write {
+ entry: SnapEntry::SlotInfo {
+ start_slot: new_start_slot,
+ commit_bar: self.commit_bar,
+ },
+ offset: 0,
+ sync: self.config.logger_sync,
+ },
+ )?;
+ let (_, log_result) = self.snapshot_hub.get_result().await?;
+ match log_result {
+ LogResult::Write {
+ offset_ok: true, ..
+ } => {}
+ _ => {
+ return logged_err!(self.id; "unexpected log result type or failed write");
+ }
+ }
// update start_slot and discard all in-memory log instances up to exec_bar
- self.insts.drain(0..(self.exec_bar - self.start_slot));
- self.start_slot = self.exec_bar;
+ self.insts.drain(0..(new_start_slot - self.start_slot));
+ self.start_slot = new_start_slot;
// discarding everything older than start_slot in WAL log
- if self.is_leader {
+ if self.is_leader() {
// NOTE: broadcast heartbeats here to appease followers
self.bcast_heartbeats()?;
}
@@ -1533,11 +1825,20 @@ impl MultiPaxosReplica {
match log_result {
LogResult::Read {
- entry: Some(SnapEntry::StartSlot { slot }),
+ entry:
+ Some(SnapEntry::SlotInfo {
+ start_slot,
+ commit_bar,
+ }),
end_offset,
} => {
self.snap_offset = end_offset;
- self.start_slot = slot; // get start slot index of in-mem log
+
+ // recover necessary slot indices info
+ self.start_slot = start_slot;
+ self.commit_bar = commit_bar;
+ self.exec_bar = start_slot;
+ self.snap_bar = start_slot;
// repeatedly apply key-value pairs
loop {
@@ -1580,6 +1881,11 @@ impl MultiPaxosReplica {
self.control_hub.send_ctrl(CtrlMsg::SnapshotUpTo {
new_start: self.start_slot,
})?;
+
+ if self.start_slot > 0 {
+ pf_info!(self.id; "recovered from snapshot: start {} commit {} exec {}",
+ self.start_slot, self.commit_bar, self.exec_bar);
+ }
Ok(())
}
@@ -1588,7 +1894,10 @@ impl MultiPaxosReplica {
self.snapshot_hub.submit_action(
0,
LogAction::Write {
- entry: SnapEntry::StartSlot { slot: 0 },
+ entry: SnapEntry::SlotInfo {
+ start_slot: 0,
+ commit_bar: 0,
+ },
offset: 0,
sync: self.config.logger_sync,
},
@@ -1602,7 +1911,7 @@ impl MultiPaxosReplica {
self.snap_offset = now_size;
Ok(())
} else {
- logged_err!(self.id; "unexpected log result type or failed truncate")
+ logged_err!(self.id; "unexpected log result type or failed write")
}
}
@@ -1628,18 +1937,39 @@ impl GenericReplica for MultiPaxosReplica {
// parse protocol-specific configs
let config = parsed_config!(config_str => ReplicaConfigMultiPaxos;
- batch_interval_us, max_batch_size,
+ batch_interval_ms, max_batch_size,
backer_path, logger_sync,
hb_hear_timeout_min, hb_hear_timeout_max,
hb_send_interval_ms,
snapshot_path, snapshot_interval_s,
perf_storage_a, perf_storage_b,
perf_network_a, perf_network_b)?;
- if config.batch_interval_us == 0 {
+ if config.batch_interval_ms == 0 {
return logged_err!(
id;
- "invalid config.batch_interval_us '{}'",
- config.batch_interval_us
+ "invalid config.batch_interval_ms '{}'",
+ config.batch_interval_ms
+ );
+ }
+ if config.hb_hear_timeout_min < 100 {
+ return logged_err!(
+ id;
+ "invalid config.hb_hear_timeout_min '{}'",
+ config.hb_hear_timeout_min
+ );
+ }
+ if config.hb_hear_timeout_max < config.hb_hear_timeout_min + 100 {
+ return logged_err!(
+ id;
+ "invalid config.hb_hear_timeout_max '{}'",
+ config.hb_hear_timeout_max
+ );
+ }
+ if config.hb_send_interval_ms == 0 {
+ return logged_err!(
+ id;
+ "invalid config.hb_send_interval_ms '{}'",
+ config.hb_send_interval_ms
);
}
if config.hb_hear_timeout_min < 100 {
@@ -1728,7 +2058,7 @@ impl GenericReplica for MultiPaxosReplica {
let external_api = ExternalApi::new_and_setup(
id,
api_addr,
- Duration::from_micros(config.batch_interval_us),
+ Duration::from_millis(config.batch_interval_ms),
config.max_batch_size,
)
.await?;
@@ -1746,6 +2076,10 @@ impl GenericReplica for MultiPaxosReplica {
));
snapshot_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
+ let hb_reply_cnts = (0..population)
+ .filter_map(|p| if p == id { None } else { Some((p, (1, 0, 0))) })
+ .collect();
+
Ok(MultiPaxosReplica {
id,
population,
@@ -1759,9 +2093,11 @@ impl GenericReplica for MultiPaxosReplica {
storage_hub,
snapshot_hub,
transport_hub,
+ leader: None,
hb_hear_timer: Timer::new(),
hb_send_interval,
- is_leader: false,
+ hb_reply_cnts,
+ peer_alive: Bitmap::new(population, true),
insts: vec![],
start_slot: 0,
snapshot_interval,
@@ -1770,7 +2106,11 @@ impl GenericReplica for MultiPaxosReplica {
bal_max_seen: 0,
commit_bar: 0,
exec_bar: 0,
- log_offset: 0,
+ peer_exec_bar: (0..population)
+ .filter_map(|s| if s == id { None } else { Some((s, 0)) })
+ .collect(),
+ snap_bar: 0,
+ wal_offset: 0,
snap_offset: 0,
})
}
@@ -1782,8 +2122,8 @@ impl GenericReplica for MultiPaxosReplica {
// recover state from durable snapshot file
self.recover_from_snapshot().await?;
- // recover the tail-piece memory log & state from durable storage log
- self.recover_from_log().await?;
+ // recover the tail-piece memory log & state from durable WAL log
+ self.recover_from_wal().await?;
// kick off leader activity hearing timer
self.kickoff_hb_hear_timer()?;
@@ -1849,7 +2189,7 @@ impl GenericReplica for MultiPaxosReplica {
},
// leader sending heartbeat
- _ = self.hb_send_interval.tick(), if !paused && self.is_leader => {
+ _ = self.hb_send_interval.tick(), if !paused && self.is_leader() => {
if let Err(e) = self.bcast_heartbeats() {
pf_error!(self.id; "error broadcasting heartbeats: {}", e);
}
@@ -1857,7 +2197,7 @@ impl GenericReplica for MultiPaxosReplica {
// autonomous snapshot taking timeout
_ = self.snapshot_interval.tick(), if !paused
- && self.config.snapshot_interval_s > 0 => {
+ && self.config.snapshot_interval_s > 0 => {
if let Err(e) = self.take_new_snapshot().await {
pf_error!(self.id; "error taking a new snapshot: {}", e);
} else {
diff --git a/src/protocols/raft.rs b/src/protocols/raft.rs
new file mode 100644
index 00000000..4ffc04f5
--- /dev/null
+++ b/src/protocols/raft.rs
@@ -0,0 +1,2222 @@
+//! Replication protocol: Raft.
+//!
+//! ATC '14 version of Raft. References:
+//! -
+//! -
+//! -
+
+use std::cmp;
+use std::collections::{HashMap, HashSet};
+use std::path::Path;
+use std::net::SocketAddr;
+
+use crate::utils::{SummersetError, Bitmap, Timer};
+use crate::manager::{CtrlMsg, CtrlRequest, CtrlReply};
+use crate::server::{
+ ReplicaId, ControlHub, StateMachine, Command, CommandResult, CommandId,
+ ExternalApi, ApiRequest, ApiReply, StorageHub, LogAction, LogResult,
+ LogActionId, TransportHub, GenericReplica,
+};
+use crate::client::{ClientId, ClientApiStub, ClientCtrlStub, GenericEndpoint};
+use crate::protocols::SmrProtocol;
+
+use rand::prelude::*;
+
+use async_trait::async_trait;
+
+use get_size::GetSize;
+
+use serde::{Serialize, Deserialize};
+
+use tokio::time::{self, Duration, Interval, MissedTickBehavior};
+use tokio::sync::watch;
+
+/// Configuration parameters struct.
+#[derive(Debug, Deserialize)]
+pub struct ReplicaConfigRaft {
+ /// Client request batching interval in millisecs.
+ pub batch_interval_ms: u64,
+
+ /// Client request batching maximum batch size.
+ pub max_batch_size: usize,
+
+ /// Path to backing log file.
+ pub backer_path: String,
+
+ /// Whether to call `fsync()`/`fdatasync()` on logger.
+ pub logger_sync: bool,
+
+ /// Min timeout of not hearing any heartbeat from leader in millisecs.
+ pub hb_hear_timeout_min: u64,
+ /// Max timeout of not hearing any heartbeat from leader in millisecs.
+ pub hb_hear_timeout_max: u64,
+
+ /// Interval of leader sending AppendEntries heartbeats to followers.
+ pub hb_send_interval_ms: u64,
+
+ /// Path to snapshot file.
+ pub snapshot_path: String,
+
+ /// Snapshot self-triggering interval in secs. 0 means never trigger
+ /// snapshotting autonomously.
+ pub snapshot_interval_s: u64,
+
+ // Performance simulation params (all zeros means no perf simulation):
+ pub perf_storage_a: u64,
+ pub perf_storage_b: u64,
+ pub perf_network_a: u64,
+ pub perf_network_b: u64,
+}
+
+#[allow(clippy::derivable_impls)]
+impl Default for ReplicaConfigRaft {
+ fn default() -> Self {
+ ReplicaConfigRaft {
+ batch_interval_ms: 10,
+ max_batch_size: 5000,
+ backer_path: "/tmp/summerset.raft.wal".into(),
+ logger_sync: false,
+ hb_hear_timeout_min: 600,
+ hb_hear_timeout_max: 900,
+ hb_send_interval_ms: 50,
+ snapshot_path: "/tmp/summerset.raft.snap".into(),
+ snapshot_interval_s: 0,
+ perf_storage_a: 0,
+ perf_storage_b: 0,
+ perf_network_a: 0,
+ perf_network_b: 0,
+ }
+ }
+}
+
+/// Term number type, defined for better code readability.
+type Term = u64;
+
+/// Request batch type (i.e., the "command" in an entry).
+///
+/// NOTE: the originally presented Raft algorithm does not explicitly mention
+/// batching, but instead hides it with the heartbeats: every AppendEntries RPC
+/// from the leader basically batches all commands it has received since the
+/// last sent heartbeat. Here, to make this implementation more comparable to
+/// MultiPaxos, we trigger batching also explicitly.
+type ReqBatch = Vec<(ClientId, ApiRequest)>;
+
+/// In-mem + persistent entry of log, containing a term and a commands batch.
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, GetSize)]
+struct LogEntry {
+ /// Term number.
+ term: Term,
+
+ /// Batch of client requests.
+ reqs: ReqBatch,
+
+ /// True if from external client, else false.
+ external: bool,
+
+ /// Offset in durable log file of this entry. This field is not maintained
+ /// in durable storage itself, where it is typically 0. It is maintained
+ /// only in the in-memory log.
+ log_offset: usize,
+}
+
+/// Stable storage log entry type.
+///
+/// NOTE: Raft makes the persistent log exactly mirror the in-memory log, so
+/// the backer file is not a WAL log in runtime operation; it might get
+/// overwritten, etc.
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, GetSize)]
+enum DurEntry {
+ /// Durable metadata.
+ Metadata {
+ curr_term: Term,
+ voted_for: Option,
+ },
+
+ /// Log entry mirroring in-mem log.
+ LogEntry { entry: LogEntry },
+}
+
+/// Snapshot file entry type.
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, GetSize)]
+enum SnapEntry {
+ /// Necessary slot indices to remember.
+ SlotInfo {
+ /// First entry at the start of file: number of log entries covered
+ /// by this snapshot file == the start slot index of remaining log.
+ start_slot: usize,
+ },
+
+ /// Set of key-value pairs to apply to the state.
+ KVPairSet { pairs: HashMap },
+}
+
+/// Peer-peer message type.
+#[derive(Debug, Clone, Serialize, Deserialize, GetSize)]
+enum PeerMsg {
+ /// AppendEntries from leader to followers.
+ AppendEntries {
+ term: Term,
+ prev_slot: usize,
+ prev_term: Term,
+ entries: Vec,
+ leader_commit: usize,
+ /// For conservative snapshotting purpose.
+ last_snap: usize,
+ },
+
+ /// AppendEntries reply from follower to leader.
+ AppendEntriesReply {
+ term: Term,
+ /// For correct tracking of which AppendEntries this reply is for.
+ end_slot: usize,
+ success: bool,
+ },
+
+ /// RequestVote from leader to followers.
+ RequestVote {
+ term: Term,
+ last_slot: usize,
+ last_term: Term,
+ },
+
+ /// RequestVote reply from follower to leader.
+ RequestVoteReply { term: Term, granted: bool },
+}
+
+/// Replica role type.
+#[derive(
+ Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
+)]
+enum Role {
+ Follower,
+ Candidate,
+ Leader,
+}
+
+/// Raft server replica module.
+pub struct RaftReplica {
+ /// Replica ID in cluster.
+ id: ReplicaId,
+
+ /// Total number of replicas in cluster.
+ population: u8,
+
+ /// Majority quorum size.
+ quorum_cnt: u8,
+
+ /// Configuration parameters struct.
+ config: ReplicaConfigRaft,
+
+ /// Address string for client requests API.
+ _api_addr: SocketAddr,
+
+ /// Address string for internal peer-peer communication.
+ _p2p_addr: SocketAddr,
+
+ /// ControlHub module.
+ control_hub: ControlHub,
+
+ /// ExternalApi module.
+ external_api: ExternalApi,
+
+ /// StateMachine module.
+ state_machine: StateMachine,
+
+ /// StorageHub module.
+ storage_hub: StorageHub,
+
+ /// StorageHub module for the snapshot file.
+ snapshot_hub: StorageHub,
+
+ /// TransportHub module.
+ transport_hub: TransportHub,
+
+ /// Which role am I in right now?
+ role: Role,
+
+ /// Who do I think is the effective leader of the cluster right now?
+ leader: Option,
+
+ /// Timer for hearing heartbeat from leader.
+ hb_hear_timer: Timer,
+
+ /// Interval for sending heartbeat to followers.
+ hb_send_interval: Interval,
+
+ /// Heartbeat reply counters for approximate detection of follower health.
+ /// Tuple of (#hb_replied, #hb_replied seen at last send, repetition).
+ hb_reply_cnts: HashMap,
+
+ /// Approximate health status tracking of peer replicas.
+ peer_alive: Bitmap,
+
+ /// Latest term seen.
+ curr_term: Term,
+
+ /// Candidate ID that I voted for in current term.
+ voted_for: Option,
+
+ /// Replica IDs that voted for me in current election.
+ votes_granted: HashSet,
+
+ /// In-memory log of entries. Slot 0 is a dummy entry to make indexing happy.
+ log: Vec,
+
+ /// Start slot index of in-mem log after latest snapshot.
+ start_slot: usize,
+
+ /// Timer for taking a new autonomous snapshot.
+ snapshot_interval: Interval,
+
+ /// Slot index of highest log entry known to be committed.
+ last_commit: usize,
+
+ /// Slot index of highest log entry applied to state machine.
+ last_exec: usize,
+
+ /// For each server, index of the next log entry to send.
+ next_slot: HashMap,
+
+ /// For each server, index of the highest log entry known to be replicated.
+ match_slot: HashMap,
+
+ /// Slot index up to which it is safe to take snapshot.
+ /// NOTE: we are taking a conservative approach here that a snapshot
+ /// covering an entry can be taken only when all servers have durably
+ /// committed that entry.
+ last_snap: usize,
+
+ /// Current durable log file end offset.
+ log_offset: usize,
+
+ /// Current durable log end of offset of metadata.
+ log_meta_end: usize,
+
+ /// Current durable snapshot file offset.
+ snap_offset: usize,
+}
+
+// RaftReplica common helpers
+impl RaftReplica {
+ /// Compose LogActionId from (slot, end_slot) pair & entry type.
+ /// Uses the `Role` enum type to represent differnet entry types.
+ #[inline]
+ fn make_log_action_id(
+ slot: usize,
+ slot_e: usize,
+ entry_type: Role,
+ ) -> LogActionId {
+ let type_num = match entry_type {
+ Role::Follower => 1,
+ Role::Leader => 2,
+ _ => panic!("unknown log entry type {:?}", entry_type),
+ };
+ ((slot << 33) | (slot_e << 2) | type_num) as LogActionId
+ }
+
+ /// Decompose LogActionId into (slot, end_slot) pair & entry type.
+ #[inline]
+ fn split_log_action_id(log_action_id: LogActionId) -> (usize, usize, Role) {
+ let slot = (log_action_id >> 33) as usize;
+ let slot_e = ((log_action_id & ((1 << 33) - 1)) >> 2) as usize;
+ let type_num = log_action_id & ((1 << 2) - 1);
+ let entry_type = match type_num {
+ 1 => Role::Follower,
+ 2 => Role::Leader,
+ _ => panic!("unknown log entry type num {}", type_num),
+ };
+ (slot, slot_e, entry_type)
+ }
+
+ /// Compose CommandId from slot index & command index within.
+ #[inline]
+ fn make_command_id(slot: usize, cmd_idx: usize) -> CommandId {
+ assert!(slot <= (u32::MAX as usize));
+ assert!(cmd_idx <= (u32::MAX as usize));
+ ((slot << 32) | cmd_idx) as CommandId
+ }
+
+ /// Decompose CommandId into slot index & command index within.
+ #[inline]
+ fn split_command_id(command_id: CommandId) -> (usize, usize) {
+ let slot = (command_id >> 32) as usize;
+ let cmd_idx = (command_id & ((1 << 32) - 1)) as usize;
+ (slot, cmd_idx)
+ }
+
+ /// Check if the given term is larger than mine. If so, convert my role
+ /// back to follower. Returns true if my role was not follower but now
+ /// converted to follower, and false otherwise.
+ #[inline]
+ fn check_term(
+ &mut self,
+ peer: ReplicaId,
+ term: Term,
+ ) -> Result {
+ if term > self.curr_term {
+ self.curr_term = term;
+ self.heard_heartbeat(peer, term)?; // refresh election timer
+ if self.role != Role::Follower {
+ self.role = Role::Follower;
+ pf_trace!(self.id; "converted back to follower");
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ } else {
+ Ok(false)
+ }
+ }
+}
+
+// RaftReplica client requests entrance
+impl RaftReplica {
+ /// Handler of client request batch chan recv.
+ fn handle_req_batch(
+ &mut self,
+ req_batch: ReqBatch,
+ ) -> Result<(), SummersetError> {
+ let batch_size = req_batch.len();
+ assert!(batch_size > 0);
+ pf_debug!(self.id; "got request batch of size {}", batch_size);
+
+ // if I'm not a leader, ignore client requests
+ if self.role != Role::Leader {
+ for (client, req) in req_batch {
+ if let ApiRequest::Req { id: req_id, .. } = req {
+ // tell the client to try on known leader or just the
+ // next ID replica
+ let target = if let Some(peer) = self.leader {
+ peer
+ } else {
+ (self.id + 1) % self.population
+ };
+ self.external_api.send_reply(
+ ApiReply::Reply {
+ id: req_id,
+ result: None,
+ redirect: Some(target),
+ },
+ client,
+ )?;
+ pf_trace!(self.id; "redirected client {} to replica {}",
+ client, target);
+ }
+ }
+ return Ok(());
+ }
+
+ // append an entry to in-memory log
+ let entry = LogEntry {
+ term: self.curr_term,
+ reqs: req_batch,
+ external: true,
+ log_offset: 0,
+ };
+ let slot = self.start_slot + self.log.len();
+ self.log.push(entry.clone());
+
+ // submit logger action to make this log entry durable
+ self.storage_hub.submit_action(
+ Self::make_log_action_id(slot, slot, Role::Leader),
+ LogAction::Append {
+ entry: DurEntry::LogEntry { entry },
+ sync: self.config.logger_sync,
+ },
+ )?;
+ pf_trace!(self.id; "submitted leader append log action for slot {}", slot);
+
+ Ok(())
+ }
+}
+
+// RaftReplica durable logging
+impl RaftReplica {
+ /// Handler of leader append logging result chan recv.
+ fn handle_logged_leader_append(
+ &mut self,
+ slot: usize,
+ slot_e: usize,
+ ) -> Result<(), SummersetError> {
+ if slot < self.start_slot || self.role != Role::Leader {
+ return Ok(()); // ignore if outdated
+ }
+ pf_trace!(self.id; "finished leader append logging for slot {} <= {}",
+ slot, slot_e);
+ assert_eq!(slot, slot_e);
+
+ // broadcast AppendEntries messages to followers
+ for peer in 0..self.population {
+ if peer == self.id || self.next_slot[&peer] < 1 {
+ continue;
+ }
+
+ let prev_slot = self.next_slot[&peer] - 1;
+ if prev_slot < self.start_slot {
+ return logged_err!(self.id; "snapshotted slot {} queried", prev_slot);
+ }
+ let prev_term = self.log[prev_slot - self.start_slot].term;
+ let entries = self
+ .log
+ .iter()
+ .skip(self.next_slot[&peer] - self.start_slot)
+ .cloned()
+ .collect();
+
+ if slot >= self.next_slot[&peer] {
+ self.transport_hub.send_msg(
+ PeerMsg::AppendEntries {
+ term: self.curr_term,
+ prev_slot,
+ prev_term,
+ entries,
+ leader_commit: self.last_commit,
+ last_snap: self.last_snap,
+ },
+ peer,
+ )?;
+ pf_trace!(self.id; "sent AppendEntries -> {} with slots {} - {}",
+ peer, self.next_slot[&peer],
+ self.start_slot + self.log.len() - 1);
+ }
+ }
+
+ // I also heard my own heartbeat
+ self.heard_heartbeat(self.id, self.curr_term)?;
+
+ Ok(())
+ }
+
+ /// Handler of follower append logging result chan recv.
+ fn handle_logged_follower_append(
+ &mut self,
+ slot: usize,
+ slot_e: usize,
+ ) -> Result<(), SummersetError> {
+ if slot < self.start_slot || self.role != Role::Follower {
+ return Ok(()); // ignore if outdated
+ }
+ pf_trace!(self.id; "finished follower append logging for slot {} <= {}",
+ slot, slot_e);
+ assert!(slot <= slot_e);
+
+ // if all consecutive entries are made durable, reply AppendEntries
+ // success back to leader
+ if slot == slot_e {
+ if let Some(leader) = self.leader {
+ self.transport_hub.send_msg(
+ PeerMsg::AppendEntriesReply {
+ term: self.curr_term,
+ end_slot: slot_e,
+ success: true,
+ },
+ leader,
+ )?;
+ pf_trace!(self.id; "sent AppendEntriesReply -> {} up to slot {}",
+ leader, slot_e);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Synthesized handler of durable logging result chan recv.
+ fn handle_log_result(
+ &mut self,
+ action_id: LogActionId,
+ log_result: LogResult,
+ ) -> Result<(), SummersetError> {
+ let (slot, slot_e, entry_type) = Self::split_log_action_id(action_id);
+ if slot < self.start_slot {
+ return Ok(()); // ignore if slot index outdated
+ }
+ assert!(slot_e < self.start_slot + self.log.len());
+
+ if let LogResult::Append { now_size } = log_result {
+ let entry = &mut self.log[slot - self.start_slot];
+ if entry.log_offset != self.log_offset {
+ // entry has incorrect log_offset bookkept; update it
+ entry.log_offset = self.log_offset;
+ }
+ assert!(now_size > self.log_offset);
+ self.log_offset = now_size;
+ } else {
+ return logged_err!(self.id; "unexpected log result type: {:?}", log_result);
+ }
+
+ match entry_type {
+ Role::Follower => self.handle_logged_follower_append(slot, slot_e),
+ Role::Leader => self.handle_logged_leader_append(slot, slot_e),
+ _ => {
+ logged_err!(self.id; "unexpected log entry type: {:?}", entry_type)
+ }
+ }
+ }
+}
+
+// RaftReplica peer-peer messages handling
+impl RaftReplica {
+ /// Handler of AppendEntries message from leader.
+ #[allow(clippy::too_many_arguments)]
+ async fn handle_msg_append_entries(
+ &mut self,
+ leader: ReplicaId,
+ term: Term,
+ prev_slot: usize,
+ prev_term: Term,
+ mut entries: Vec,
+ leader_commit: usize,
+ last_snap: usize,
+ ) -> Result<(), SummersetError> {
+ if !entries.is_empty() {
+ pf_trace!(self.id; "received AcceptEntries <- {} for slots {} - {} term {}",
+ leader, prev_slot + 1, prev_slot + entries.len(), term);
+ }
+ if self.check_term(leader, term)? || self.role != Role::Follower {
+ return Ok(());
+ }
+
+ // reply false if term smaller than mine, or if my log does not
+ // contain an entry at prev_slot matching prev_term
+ if term < self.curr_term
+ || prev_slot < self.start_slot
+ || prev_slot >= self.start_slot + self.log.len()
+ || self.log[prev_slot - self.start_slot].term != prev_term
+ {
+ self.transport_hub.send_msg(
+ PeerMsg::AppendEntriesReply {
+ term: self.curr_term,
+ end_slot: prev_slot,
+ success: false,
+ },
+ leader,
+ )?;
+ pf_trace!(self.id; "sent AcceptEntriesReply -> {} term {} end_slot {} fail",
+ leader, self.curr_term, prev_slot);
+
+ if term >= self.curr_term {
+ // also refresh heartbeat timer here since the "decrementing"
+ // procedure for a lagging follower might take long
+ self.heard_heartbeat(leader, term)?;
+ }
+ return Ok(());
+ }
+
+ // update my knowledge of who's the current leader, and reset election
+ // timeout timer
+ self.leader = Some(leader);
+ self.heard_heartbeat(leader, term)?;
+
+ // check if any existing entry conflicts with a new one in `entries`.
+ // If so, truncate everything at and after that entry
+ let mut first_new = prev_slot + 1;
+ for (slot, new_entry) in entries
+ .iter()
+ .enumerate()
+ .map(|(s, e)| (s + prev_slot + 1, e))
+ {
+ if slot >= self.start_slot + self.log.len() {
+ first_new = slot;
+ break;
+ } else if self.log[slot - self.start_slot].term != new_entry.term {
+ let cut_offset = self.log[slot - self.start_slot].log_offset;
+ // do this truncation in-place for simplicity
+ self.storage_hub.submit_action(
+ 0,
+ LogAction::Truncate { offset: cut_offset },
+ )?;
+ loop {
+ let (action_id, log_result) =
+ self.storage_hub.get_result().await?;
+ if action_id != 0 {
+ // normal log action previously in queue; process it
+ self.handle_log_result(action_id, log_result)?;
+ } else {
+ if let LogResult::Truncate {
+ offset_ok: true,
+ now_size,
+ } = log_result
+ {
+ assert_eq!(now_size, cut_offset);
+ self.log_offset = cut_offset;
+ } else {
+ return logged_err!(
+ self.id;
+ "unexpected log result type or failed truncate"
+ );
+ }
+ break;
+ }
+ }
+ // truncate in-mem log as well
+ self.log.truncate(slot - self.start_slot);
+ first_new = slot;
+ break;
+ }
+ }
+
+ // append new entries into my log, and submit logger actions to make
+ // new entries durable
+ let (num_entries, mut num_appended) = (entries.len(), 0);
+ for (slot, mut entry) in entries
+ .drain((first_new - prev_slot - 1)..entries.len())
+ .enumerate()
+ .map(|(s, e)| (s + first_new, e))
+ {
+ entry.log_offset = 0;
+
+ self.log.push(entry.clone());
+ self.storage_hub.submit_action(
+ Self::make_log_action_id(
+ slot,
+ prev_slot + num_entries,
+ Role::Follower,
+ ),
+ LogAction::Append {
+ entry: DurEntry::LogEntry { entry },
+ sync: self.config.logger_sync,
+ },
+ )?;
+
+ num_appended += 1;
+ }
+
+ // even if no entries appended, also send back AppendEntriesReply
+ // as a follower-to-leader reverse heardbeat for peer health
+ // tracking purposes
+ if num_appended == 0 {
+ self.transport_hub.send_msg(
+ PeerMsg::AppendEntriesReply {
+ term: self.curr_term,
+ end_slot: first_new - 1,
+ success: true,
+ },
+ leader,
+ )?;
+ }
+
+ // if leader_commit is larger than my last_commit, update last_commit
+ if leader_commit > self.last_commit {
+ let new_commit = cmp::min(leader_commit, prev_slot + entries.len());
+
+ // submit newly committed entries for state machine execution
+ for slot in (self.last_commit + 1)..=new_commit {
+ let entry = &self.log[slot - self.start_slot];
+ for (cmd_idx, (_, req)) in entry.reqs.iter().enumerate() {
+ if let ApiRequest::Req { cmd, .. } = req {
+ self.state_machine.submit_cmd(
+ Self::make_command_id(slot, cmd_idx),
+ cmd.clone(),
+ )?;
+ } else {
+ continue; // ignore other types of requests
+ }
+ }
+ }
+
+ self.last_commit = new_commit;
+ }
+
+ // if last_snap is larger than mine, update last_snap
+ if last_snap > self.last_snap {
+ self.last_snap = last_snap;
+ }
+
+ Ok(())
+ }
+
+ /// Handler of AppendEntries reply from follower.
+ fn handle_msg_append_entries_reply(
+ &mut self,
+ peer: ReplicaId,
+ term: Term,
+ end_slot: usize,
+ success: bool,
+ ) -> Result<(), SummersetError> {
+ if !success || self.match_slot[&peer] != end_slot {
+ pf_trace!(self.id; "received AcceptEntriesReply <- {} for term {} {}",
+ peer, term, if success { "ok" } else { "fail" });
+ }
+ if self.check_term(peer, term)? || self.role != Role::Leader {
+ return Ok(());
+ }
+ self.heard_heartbeat(peer, term)?;
+
+ if success {
+ // success: update next_slot and match_slot for follower
+ *self.next_slot.get_mut(&peer).unwrap() = end_slot + 1;
+ *self.match_slot.get_mut(&peer).unwrap() = end_slot;
+
+ // since we updated some match_slot here, check if any additional
+ // entries are now considered committed
+ let mut new_commit = self.last_commit;
+ for slot in
+ (self.last_commit + 1)..(self.start_slot + self.log.len())
+ {
+ let entry = &self.log[slot - self.start_slot];
+ if entry.term != self.curr_term {
+ continue; // cannot decide commit using non-latest term
+ }
+
+ let match_cnt = 1 + self
+ .match_slot
+ .values()
+ .filter(|&&s| s >= slot)
+ .count() as u8;
+ if match_cnt >= self.quorum_cnt {
+ // quorum size reached, set new_commit to here
+ new_commit = slot;
+ }
+ }
+
+ // submit newly committed commands, if any, for execution
+ for slot in (self.last_commit + 1)..=new_commit {
+ let entry = &self.log[slot - self.start_slot];
+ for (cmd_idx, (_, req)) in entry.reqs.iter().enumerate() {
+ if let ApiRequest::Req { cmd, .. } = req {
+ self.state_machine.submit_cmd(
+ Self::make_command_id(slot, cmd_idx),
+ cmd.clone(),
+ )?;
+ } else {
+ continue; // ignore other types of requests
+ }
+ }
+ }
+
+ self.last_commit = new_commit;
+
+ // also check if any additional entries are safe to snapshot
+ for slot in (self.last_snap + 1)..=end_slot {
+ let match_cnt = 1 + self
+ .match_slot
+ .values()
+ .filter(|&&s| s >= slot)
+ .count() as u8;
+ if match_cnt == self.population {
+ // all servers have durably stored this entry
+ self.last_snap = slot;
+ }
+ }
+ } else {
+ // failed: decrement next_slot for follower and retry
+ // NOTE: the optimization of fast-backward bypassing (instead of
+ // always decrementing by 1) not implemented
+ if self.next_slot[&peer] == 1 {
+ return Ok(()); // cannot move backward any more
+ }
+ *self.next_slot.get_mut(&peer).unwrap() -= 1;
+
+ let prev_slot = self.next_slot[&peer] - 1;
+ if prev_slot < self.start_slot {
+ *self.next_slot.get_mut(&peer).unwrap() += 1;
+ return logged_err!(self.id; "snapshotted slot {} queried", prev_slot);
+ }
+ let prev_term = self.log[prev_slot - self.start_slot].term;
+ let entries = self
+ .log
+ .iter()
+ .skip(self.next_slot[&peer] - self.start_slot)
+ .cloned()
+ .collect();
+
+ self.transport_hub.send_msg(
+ PeerMsg::AppendEntries {
+ term: self.curr_term,
+ prev_slot,
+ prev_term,
+ entries,
+ leader_commit: self.last_commit,
+ last_snap: self.last_snap,
+ },
+ peer,
+ )?;
+ pf_trace!(self.id; "sent AppendEntries -> {} with slots {} - {}",
+ peer, self.next_slot[&peer],
+ self.start_slot + self.log.len() - 1);
+ }
+
+ Ok(())
+ }
+
+ /// Handler of RequestVote message from candidate.
+ fn handle_msg_request_vote(
+ &mut self,
+ candidate: ReplicaId,
+ term: Term,
+ last_slot: usize,
+ last_term: Term,
+ ) -> Result<(), SummersetError> {
+ pf_trace!(self.id; "received RequestVote <- {} with term {} last {} term {}",
+ candidate, term, last_slot, last_term);
+ self.check_term(candidate, term)?;
+
+ // if the given term is smaller than mine, reply false
+ if term < self.curr_term {
+ self.transport_hub.send_msg(
+ PeerMsg::RequestVoteReply {
+ term: self.curr_term,
+ granted: false,
+ },
+ candidate,
+ )?;
+ pf_trace!(self.id; "sent RequestVoteReply -> {} term {} false",
+ candidate, self.curr_term);
+ return Ok(());
+ }
+
+ // if I did not vote for anyone else in my current term and that the
+ // candidate's log is as up-to-date as mine, grant vote
+ #[allow(clippy::collapsible_if)]
+ if self.voted_for.is_none() || (self.voted_for.unwrap() == candidate) {
+ if last_term >= self.log.last().unwrap().term
+ || (last_term == self.curr_term
+ && last_slot + 1 >= self.start_slot + self.log.len())
+ {
+ self.transport_hub.send_msg(
+ PeerMsg::RequestVoteReply {
+ term: self.curr_term,
+ granted: true,
+ },
+ candidate,
+ )?;
+ pf_trace!(self.id; "sent RequestVoteReply -> {} term {} granted",
+ candidate, self.curr_term);
+
+ // hear a heartbeat here to prevent me from starting an
+ // election soon
+ self.heard_heartbeat(candidate, term)?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Handler of RequestVote reply from peer.
+ fn handle_msg_request_vote_reply(
+ &mut self,
+ peer: ReplicaId,
+ term: Term,
+ granted: bool,
+ ) -> Result<(), SummersetError> {
+ pf_trace!(self.id; "received RequestVoteReply <- {} with term {} {}",
+ peer, term, if granted { "granted" } else { "false" });
+ if self.check_term(peer, term)? || self.role != Role::Candidate {
+ return Ok(());
+ }
+
+ // bookkeep this vote
+ self.votes_granted.insert(peer);
+
+ // if a majority of servers have voted for me, become the leader
+ if self.votes_granted.len() as u8 >= self.quorum_cnt {
+ self.become_the_leader()?;
+ }
+
+ Ok(())
+ }
+
+ /// Synthesized handler of receiving message from peer.
+ async fn handle_msg_recv(
+ &mut self,
+ peer: ReplicaId,
+ msg: PeerMsg,
+ ) -> Result<(), SummersetError> {
+ match msg {
+ PeerMsg::AppendEntries {
+ term,
+ prev_slot,
+ prev_term,
+ entries,
+ leader_commit,
+ last_snap,
+ } => {
+ self.handle_msg_append_entries(
+ peer,
+ term,
+ prev_slot,
+ prev_term,
+ entries,
+ leader_commit,
+ last_snap,
+ )
+ .await
+ }
+ PeerMsg::AppendEntriesReply {
+ term,
+ end_slot,
+ success,
+ } => self
+ .handle_msg_append_entries_reply(peer, term, end_slot, success),
+ PeerMsg::RequestVote {
+ term,
+ last_slot,
+ last_term,
+ } => self.handle_msg_request_vote(peer, term, last_slot, last_term),
+ PeerMsg::RequestVoteReply { term, granted } => {
+ self.handle_msg_request_vote_reply(peer, term, granted)
+ }
+ }
+ }
+}
+
+// RaftReplica state machine execution
+impl RaftReplica {
+ /// Handler of state machine exec result chan recv.
+ fn handle_cmd_result(
+ &mut self,
+ cmd_id: CommandId,
+ cmd_result: CommandResult,
+ ) -> Result<(), SummersetError> {
+ let (slot, cmd_idx) = Self::split_command_id(cmd_id);
+ if slot < self.start_slot {
+ return Ok(()); // ignore if slot index outdated
+ }
+ assert!(slot < self.start_slot + self.log.len());
+ pf_trace!(self.id; "executed cmd in entry at slot {} idx {}",
+ slot, cmd_idx);
+
+ let entry = &mut self.log[slot - self.start_slot];
+ assert!(cmd_idx < entry.reqs.len());
+ let (client, ref req) = entry.reqs[cmd_idx];
+
+ // reply command result back to client
+ if let ApiRequest::Req { id: req_id, .. } = req {
+ if entry.external && self.external_api.has_client(client) {
+ self.external_api.send_reply(
+ ApiReply::Reply {
+ id: *req_id,
+ result: Some(cmd_result),
+ redirect: None,
+ },
+ client,
+ )?;
+ pf_trace!(self.id; "replied -> client {} for slot {} idx {}",
+ client, slot, cmd_idx);
+ }
+ } else {
+ return logged_err!(self.id; "unexpected API request type");
+ }
+
+ // if all commands in this entry have been executed, update last_exec
+ if cmd_idx == entry.reqs.len() - 1 {
+ pf_debug!(self.id; "executed all cmds in entry at slot {}", slot);
+ self.last_exec = slot;
+ }
+
+ Ok(())
+ }
+}
+
+// RaftReplica leader election timeout logic
+impl RaftReplica {
+ /// Becomes a candidate and starts the election procedure.
+ async fn become_a_candidate(&mut self) -> Result<(), SummersetError> {
+ if self.role != Role::Follower {
+ return Ok(());
+ } else if let Some(peer) = self.leader {
+ // mark old leader as dead
+ if self.peer_alive.get(peer)? {
+ self.peer_alive.set(peer, false)?;
+ pf_debug!(self.id; "peer_alive updated: {:?}", self.peer_alive);
+ }
+ }
+
+ self.role = Role::Candidate;
+
+ // increment current term and vote for myself
+ self.curr_term += 1;
+ self.voted_for = Some(self.id);
+ self.votes_granted = HashSet::from([self.id]);
+ pf_info!(self.id; "starting election with term {}...", self.curr_term);
+
+ // also make the two critical fields durable, synchronously
+ self.storage_hub.submit_action(
+ 0,
+ LogAction::Write {
+ entry: DurEntry::Metadata {
+ curr_term: self.curr_term,
+ voted_for: self.voted_for,
+ },
+ offset: 0,
+ sync: self.config.logger_sync,
+ },
+ )?;
+ loop {
+ let (action_id, log_result) = self.storage_hub.get_result().await?;
+ if action_id != 0 {
+ // normal log action previously in queue; process it
+ self.handle_log_result(action_id, log_result)?;
+ } else {
+ if let LogResult::Write {
+ offset_ok: true, ..
+ } = log_result
+ {
+ } else {
+ return logged_err!(self.id; "unexpected log result type or failed write");
+ }
+ break;
+ }
+ }
+
+ // reset election timeout timer
+ self.heard_heartbeat(self.id, self.curr_term)?;
+
+ // send RequestVote messages to all other peers
+ let last_slot = self.start_slot + self.log.len() - 1;
+ assert!(last_slot >= self.start_slot);
+ let last_term = self.log[last_slot - self.start_slot].term;
+ self.transport_hub.bcast_msg(
+ PeerMsg::RequestVote {
+ term: self.curr_term,
+ last_slot,
+ last_term,
+ },
+ None,
+ )?;
+ pf_trace!(self.id; "broadcast RequestVote with term {} last {} term {}",
+ self.curr_term, last_slot, last_term);
+
+ Ok(())
+ }
+
+ /// Becomes the leader after enough votes granted for me.
+ fn become_the_leader(&mut self) -> Result<(), SummersetError> {
+ pf_info!(self.id; "elected to be leader with term {}", self.curr_term);
+ self.role = Role::Leader;
+
+ // clear peers' heartbeat reply counters, and broadcast a heartbeat now
+ for cnts in self.hb_reply_cnts.values_mut() {
+ *cnts = (1, 0, 0);
+ }
+ self.bcast_heartbeats()?;
+
+ // re-initialize next_slot and match_slot information
+ for slot in self.next_slot.values_mut() {
+ *slot = self.start_slot + self.log.len();
+ }
+ for slot in self.match_slot.values_mut() {
+ *slot = 0;
+ }
+
+ Ok(())
+ }
+
+ /// Broadcasts empty AppendEntries messages as heartbeats to all peers.
+ fn bcast_heartbeats(&mut self) -> Result<(), SummersetError> {
+ let prev_slot = self.start_slot + self.log.len() - 1;
+ assert!(prev_slot >= self.start_slot);
+ let prev_term = self.log[prev_slot - self.start_slot].term;
+ self.transport_hub.bcast_msg(
+ PeerMsg::AppendEntries {
+ term: self.curr_term,
+ prev_slot,
+ prev_term,
+ entries: vec![],
+ leader_commit: self.last_commit,
+ last_snap: self.last_snap,
+ },
+ None,
+ )?;
+
+ // update max heartbeat reply counters and their repetitions seen
+ for (&peer, cnts) in self.hb_reply_cnts.iter_mut() {
+ if cnts.0 > cnts.1 {
+ // more hb replies have been received from this peer; it is
+ // probably alive
+ cnts.1 = cnts.0;
+ cnts.2 = 0;
+ } else {
+ // did not receive hb reply from this peer at least for the
+ // last sent hb from me; increment repetition count
+ cnts.2 += 1;
+ let repeat_threshold = (self.config.hb_hear_timeout_min
+ / self.config.hb_send_interval_ms)
+ as u8;
+ if cnts.2 > repeat_threshold {
+ // did not receive hb reply from this peer for too many
+ // past hbs sent from me; this peer is probably dead
+ if self.peer_alive.get(peer)? {
+ self.peer_alive.set(peer, false)?;
+ pf_debug!(self.id; "peer_alive updated: {:?}", self.peer_alive);
+ }
+ }
+ }
+ }
+
+ // I also heard this heartbeat from myself
+ self.heard_heartbeat(self.id, self.curr_term)?;
+
+ // pf_trace!(self.id; "broadcast heartbeats term {}", self.curr_term);
+ Ok(())
+ }
+
+ /// Chooses a random hb_hear_timeout from the min-max range and kicks off
+ /// the hb_hear_timer.
+ fn kickoff_hb_hear_timer(&mut self) -> Result<(), SummersetError> {
+ self.hb_hear_timer.cancel()?;
+
+ let timeout_ms = thread_rng().gen_range(
+ self.config.hb_hear_timeout_min..=self.config.hb_hear_timeout_max,
+ );
+
+ // pf_trace!(self.id; "kickoff hb_hear_timer @ {} ms", timeout_ms);
+ self.hb_hear_timer
+ .kickoff(Duration::from_millis(timeout_ms))?;
+ Ok(())
+ }
+
+ /// Heard a heartbeat from some other replica. Resets election timer.
+ fn heard_heartbeat(
+ &mut self,
+ peer: ReplicaId,
+ _term: Term,
+ ) -> Result<(), SummersetError> {
+ if peer != self.id {
+ self.hb_reply_cnts.get_mut(&peer).unwrap().0 += 1;
+ if !self.peer_alive.get(peer)? {
+ self.peer_alive.set(peer, true)?;
+ pf_debug!(self.id; "peer_alive updated: {:?}", self.peer_alive);
+ }
+ }
+
+ // reset hearing timer
+ self.kickoff_hb_hear_timer()?;
+
+ // pf_trace!(self.id; "heard heartbeat <- {} term {}", peer, term);
+ Ok(())
+ }
+}
+
+// RaftReplica control messages handling
+impl RaftReplica {
+ /// Handler of ResetState control message.
+ async fn handle_ctrl_reset_state(
+ &mut self,
+ durable: bool,
+ ) -> Result<(), SummersetError> {
+ pf_warn!(self.id; "server got restart req");
+
+ // send leave notification to peers and wait for their replies
+ self.transport_hub.leave().await?;
+
+ // send leave notification to manager and wait for its reply
+ self.control_hub.send_ctrl(CtrlMsg::Leave)?;
+ while self.control_hub.recv_ctrl().await? != CtrlMsg::LeaveReply {}
+
+ // if `durable` is false, truncate backer file
+ if !durable {
+ // use 0 as a special log action ID here
+ self.storage_hub
+ .submit_action(0, LogAction::Truncate { offset: 0 })?;
+ loop {
+ let (action_id, log_result) =
+ self.storage_hub.get_result().await?;
+ if action_id == 0 {
+ if log_result
+ != (LogResult::Truncate {
+ offset_ok: true,
+ now_size: 0,
+ })
+ {
+ return logged_err!(self.id; "failed to truncate log to 0");
+ } else {
+ return Ok(());
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Handler of Pause control message.
+ fn handle_ctrl_pause(
+ &mut self,
+ paused: &mut bool,
+ ) -> Result<(), SummersetError> {
+ pf_warn!(self.id; "server got pause req");
+ *paused = true;
+ self.control_hub.send_ctrl(CtrlMsg::PauseReply)?;
+ Ok(())
+ }
+
+ /// Handler of Resume control message.
+ fn handle_ctrl_resume(
+ &mut self,
+ paused: &mut bool,
+ ) -> Result<(), SummersetError> {
+ pf_warn!(self.id; "server got resume req");
+
+ // reset leader heartbeat timer
+ self.kickoff_hb_hear_timer()?;
+
+ *paused = false;
+ self.control_hub.send_ctrl(CtrlMsg::ResumeReply)?;
+ Ok(())
+ }
+
+ /// Handler of TakeSnapshot control message.
+ async fn handle_ctrl_take_snapshot(
+ &mut self,
+ ) -> Result<(), SummersetError> {
+ pf_warn!(self.id; "server told to take snapshot");
+ self.take_new_snapshot().await?;
+
+ self.control_hub.send_ctrl(CtrlMsg::SnapshotUpTo {
+ new_start: self.start_slot,
+ })?;
+ Ok(())
+ }
+
+ /// Synthesized handler of manager control messages. If ok, returns
+ /// `Some(true)` if decides to terminate and reboot, `Some(false)` if
+ /// decides to shutdown completely, and `None` if not terminating.
+ async fn handle_ctrl_msg(
+ &mut self,
+ msg: CtrlMsg,
+ paused: &mut bool,
+ ) -> Result