Skip to content

Commit

Permalink
Add implementation of snapshotting (#18)
Browse files Browse the repository at this point in the history
* minor changes to benchmarking scripts

* minor updates to README

* add very basic crossword impl

* updates to benchmark scripts

* make Bitmap a general u8-indexed map

* fix crossword ack pattern bug

* minor updates to bench script

* minor updates to bench script

* minor updates to bench script

* fixing scripts address already in use

* fixing scripts address already in use

* fixing scripts address already in use

* fixing scripts address already in use

* add proper termination signals handling

* fixing scripts address already in use

* fix wrong NewServerJoin message send timing

* minor updates to README

* staging progress on reset control message

* add tcp_connect wrapper; better server ID assign logic

* huge updates adding server leave and reset support

* minor updates to README

* add performance delay simulation support

* minor updates to bench script

* minor updates to bench script

* fixing bench script hanging issue

* fixing bench script hanging issue

* fixing bench script hanging issue

* fixing bench script hanging issue

* fixing bench script hanging issue

* fixing bench script hanging issue

* fixing bench script hanging issue

* staging progress on log recovery

* fixing bench script hanging issue

* add crash restart test

* add log recovery logic to SimplePush

* add log recovery logic to Paxos variants

* minor changes to ResetServers control message

* minor updates to README

* staging progress on leader timeouts

* better client driver side API

* better client driver side API

* add kill procs helper script

* minor updates to tester client

* add autonomous leader timeouts and step-up

* add leader info to manager for testing

* add pause & resume control requests

* minor updates to tester client

* fix MultiPaxos prepare reply voted bug

* fix prepare reply voted bug for other Paxos variants

* add recovery read msgs & fix sharding bugs

* add scripted tests to github workflow beside unit tests

* minor updates to workflow job names

* staging progress on snapshotting

* staging progress on snapshotting

* finished snapshotting impl for MultiPaxos

* finished snapshotting impl for RSPaxos

* finished snapshotting impl for RSPaxos

* staging progress on snapshotting

* finish implementation of snapshotting

* merge private main

* merge private main

---------

Co-authored-by: josehu <[email protected]>
  • Loading branch information
josehu07 and josehu authored Sep 25, 2023
1 parent f5f51ba commit 043a157
Show file tree
Hide file tree
Showing 15 changed files with 1,234 additions and 251 deletions.
32 changes: 16 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,19 @@ Complete cluster management and benchmarking scripts are available in another re
- [x] event-based programming structure
- [x] cluster manager oracle impl.
- [x] implementation of MultiPaxos
- [x] client-side timeout/retry logic
- [x] state persistence & restart check
- [x] automatic leader election, backoffs
- [x] snapshotting & garbage collection
- [ ] specialize read-only commands?
- [ ] separate commit vs. exec responses?
- [ ] membership discovery & view changes?
- [ ] implementation of Raft
- [x] client-side utilities
- [x] REPL-style client
- [x] random benchmarking client
- [x] testing client
- [ ] benchmarking with YCSB input
- [ ] YCSB-driven benchmarking
- [ ] better README & documentation

---
Expand Down
7 changes: 5 additions & 2 deletions scripts/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def do_cargo_build(release):
if release:
cmd.append("-r")
proc = subprocess.Popen(cmd)
proc.wait()
return proc.wait()


def run_process(cmd):
Expand Down Expand Up @@ -124,7 +124,10 @@ def run_client(protocol, utility, params, release, config):
args = parser.parse_args()

# build everything
do_cargo_build(args.release)
rc = do_cargo_build(args.release)
if rc != 0:
print("ERROR: cargo build failed")
sys.exit(rc)

# run client executable
client_proc = run_client(
Expand Down
35 changes: 31 additions & 4 deletions scripts/local_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def do_cargo_build(release):
if release:
cmd.append("-r")
proc = subprocess.Popen(cmd)
proc.wait()
return proc.wait()


def run_process(cmd, capture_stderr=False):
Expand Down Expand Up @@ -47,6 +47,28 @@ def kill_all_matching(name, force=False):
"RSPaxos": lambda r: f"backer_path='/tmp/summerset.rs_paxos.{r}.wal'",
}

PROTOCOL_SNAPSHOT_PATH = {
"MultiPaxos": lambda r: f"snapshot_path='/tmp/summerset.multipaxos.{r}.snap'",
"RSPaxos": lambda r: f"snapshot_path='/tmp/summerset.rs_paxos.{r}.snap'",
}


def config_with_file_paths(protocol, config, replica):
result_config = PROTOCOL_BACKER_PATH[protocol](replica)
if protocol in PROTOCOL_SNAPSHOT_PATH:
result_config += "+"
result_config += PROTOCOL_SNAPSHOT_PATH[protocol](replica)

if config is not None and len(config) > 0:
if "backer_path" in config or "snapshot_path" in config:
result_config = config # use user-supplied path
# NOTE: ignores the other one
else:
result_config += "+"
result_config += config

return result_config


def config_with_backer_path(protocol, config, replica):
result_config = PROTOCOL_BACKER_PATH[protocol](replica)
Expand Down Expand Up @@ -131,7 +153,7 @@ def launch_servers(protocol, num_replicas, release, config):
SERVER_API_PORT(replica),
SERVER_P2P_PORT(replica),
f"127.0.0.1:{MANAGER_SRV_PORT}",
config_with_backer_path(protocol, config, replica),
config_with_file_paths(protocol, config, replica),
release,
)
proc = run_process(cmd)
Expand Down Expand Up @@ -160,12 +182,17 @@ def launch_servers(protocol, num_replicas, release, config):
kill_all_matching("summerset_server", force=True)
kill_all_matching("summerset_manager", force=True)

# remove all existing wal files
# remove all existing wal log & snapshot files
for path in Path("/tmp").glob("summerset.*.wal"):
path.unlink()
for path in Path("/tmp").glob("summerset.*.snap"):
path.unlink()

# build everything
do_cargo_build(args.release)
rc = do_cargo_build(args.release)
if rc != 0:
print("ERROR: cargo build failed")
sys.exit(rc)

# launch cluster manager oracle first
manager_proc = launch_manager(args.protocol, args.num_replicas, args.release)
Expand Down
93 changes: 87 additions & 6 deletions src/manager/clusman.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::client::ClientId;
use crate::protocols::SmrProtocol;

use tokio::sync::{mpsc, watch};
use tokio::time::{self, Duration};

/// Information about an active server.
#[derive(Debug, Clone)]
Expand All @@ -27,6 +28,9 @@ struct ServerInfo {

/// This server is currently paused?
is_paused: bool,

/// In-mem log start index after latest snapshot.
start_slot: usize,
}

/// Standalone cluster manager oracle.
Expand Down Expand Up @@ -204,6 +208,7 @@ impl ClusterManager {
p2p_addr,
is_leader: false,
is_paused: false,
start_slot: 0,
},
);
Ok(())
Expand Down Expand Up @@ -231,6 +236,28 @@ impl ClusterManager {
}
}

/// Handler of autonomous SnapshotUpTo message.
fn handle_snapshot_up_to(
&mut self,
server: ReplicaId,
new_start: usize,
) -> Result<(), SummersetError> {
if !self.server_info.contains_key(&server) {
return logged_err!("m"; "snapshot up to got unknown ID: {}", server);
}

// update this server's info
let info = self.server_info.get_mut(&server).unwrap();
if new_start < info.start_slot {
logged_err!("m"; "server {} snapshot up to {} < {}",
server, new_start,
self.server_info[&server].start_slot)
} else {
info.start_slot = new_start;
Ok(())
}
}

/// Synthesized handler of server-initiated control messages.
async fn handle_ctrl_msg(
&mut self,
Expand Down Expand Up @@ -258,6 +285,10 @@ impl ClusterManager {
self.handle_leader_status(server, step_up)?;
}

CtrlMsg::SnapshotUpTo { new_start } => {
self.handle_snapshot_up_to(server, new_start)?;
}

_ => {} // ignore all other types
}

Expand Down Expand Up @@ -328,6 +359,9 @@ impl ClusterManager {
return logged_err!("m"; "error assigning new server ID: {}", e);
}

// wait a while to ensure the server's transport hub is setup
time::sleep(Duration::from_millis(500)).await;

reset_done.insert(s);
}

Expand Down Expand Up @@ -364,7 +398,7 @@ impl ClusterManager {
// pause specified server(s)
let mut pause_done = HashSet::new();
while let Some(s) = servers.pop() {
// send puase server control message to server
// send pause server control message to server
self.server_reigner.send_ctrl(CtrlMsg::Pause, s)?;

// set the is_paused flag
Expand Down Expand Up @@ -404,19 +438,19 @@ impl ClusterManager {
// resume specified server(s)
let mut resume_done = HashSet::new();
while let Some(s) = servers.pop() {
// send puase server control message to server
// send resume server control message to server
self.server_reigner.send_ctrl(CtrlMsg::Resume, s)?;

// clear the is_paused flag
assert!(self.server_info.contains_key(&s));
self.server_info.get_mut(&s).unwrap().is_paused = false;

// wait for dummy reply
let (_, reply) = self.server_reigner.recv_ctrl().await?;
if reply != CtrlMsg::ResumeReply {
return logged_err!("m"; "unexpected reply type received");
}

// clear the is_paused flag
assert!(self.server_info.contains_key(&s));
self.server_info.get_mut(&s).unwrap().is_paused = false;

resume_done.insert(s);
}

Expand All @@ -428,6 +462,49 @@ impl ClusterManager {
)
}

/// Handler of client TakeSnapshot rquest.
async fn handle_client_take_snapshot(
&mut self,
client: ClientId,
servers: HashSet<ReplicaId>,
) -> Result<(), SummersetError> {
let mut servers: Vec<ReplicaId> = if servers.is_empty() {
// all active servers
self.server_info.keys().copied().collect()
} else {
servers.into_iter().collect()
};

// tell specified server(s)
let mut snapshot_up_to = HashMap::new();
while let Some(s) = servers.pop() {
// send take snapshot control message to server
self.server_reigner.send_ctrl(CtrlMsg::TakeSnapshot, s)?;

// wait for reply
let (_, reply) = self.server_reigner.recv_ctrl().await?;
if let CtrlMsg::SnapshotUpTo { new_start } = reply {
// update the log start index info
assert!(self.server_info.contains_key(&s));
if new_start < self.server_info[&s].start_slot {
return logged_err!("m"; "server {} snapshot up to {} < {}",
s, new_start,
self.server_info[&s].start_slot);
} else {
self.server_info.get_mut(&s).unwrap().start_slot =
new_start;
}

snapshot_up_to.insert(s, new_start);
} else {
return logged_err!("m"; "unexpected reply type received");
}
}

self.client_reactor
.send_reply(CtrlReply::TakeSnapshot { snapshot_up_to }, client)
}

/// Synthesized handler of client-initiated control requests.
async fn handle_ctrl_req(
&mut self,
Expand All @@ -453,6 +530,10 @@ impl ClusterManager {
self.handle_client_resume_servers(client, servers).await?;
}

CtrlRequest::TakeSnapshot { servers } => {
self.handle_client_take_snapshot(client, servers).await?;
}

_ => {} // ignore all other types
}

Expand Down
Loading

0 comments on commit 043a157

Please sign in to comment.