From ea426c1671d7610c939afe0057838f642f8c98b3 Mon Sep 17 00:00:00 2001 From: David Chu Date: Tue, 27 Jun 2023 10:40:15 -0700 Subject: [PATCH] Use lattices and persist_mut to prevent spinning. --- hydroflow/examples/paxos/proposer.rs | 100 ++++++++++++++++----------- hydroflow/examples/paxos/protocol.rs | 2 +- 2 files changed, 61 insertions(+), 41 deletions(-) diff --git a/hydroflow/examples/paxos/proposer.rs b/hydroflow/examples/paxos/proposer.rs index 55ba152fe271..7b3693141e71 100644 --- a/hydroflow/examples/paxos/proposer.rs +++ b/hydroflow/examples/paxos/proposer.rs @@ -1,12 +1,14 @@ use std::cmp; use std::net::SocketAddr; use std::path::Path; +use std::time::{SystemTime, UNIX_EPOCH}; use futures::join; use hydroflow::compiled::pull::HalfMultisetJoinState; use hydroflow::hydroflow_syntax; use hydroflow::scheduled::graph::Hydroflow; -use hydroflow::util::{UdpSink, UdpStream, bind_udp_bytes}; +use hydroflow::util::{UdpSink, UdpStream, bind_udp_bytes, Persistence::*}; +use hydroflow::lattices::{Max, DomPair}; use crate::helpers::{periodic, get_phase_1_addr, get_phase_2_addr, get_config}; use crate::protocol::*; @@ -93,9 +95,10 @@ pub(crate) async fn run_proposer( p2b_in = source_stream_serde(p2b_src) -> map(Result::unwrap) -> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a)) - -> map(|(m, _)| m) + -> map(|(m, _)| Persist(m)) -> [0]p2b; p2b = union() + -> persist_mut() -> tee(); client_in = source_stdin() @@ -103,7 +106,10 @@ pub(crate) async fn run_proposer( -> inspect(|m| println!("Client input: {:?}", m)); // compute ballot (default ballot num = 0) - ballot_num = union() -> persist() -> reduce::<'tick>(cmp::max); + ballot_num = union() + -> map(Max::new) + -> lattice_merge::<'static, Max>() + -> map(|lattice: Max| lattice.0); source_iter(vec![start_ballot_num]) -> [0]ballot_num; ballot = ballot_num -> map(|num| Ballot {id, num}) -> tee(); @@ -113,7 +119,11 @@ pub(crate) async fn run_proposer( leader_recv[0] -> map(|m: IAmLeader| m.ballot) -> received_ballots[0]; p1b[0] -> map(|m: P1b| m.max_ballot) -> received_ballots[1]; p2b[0] -> map(|m: P2b| m.max_ballot) -> received_ballots[2]; - received_ballots = union() -> persist() -> reduce::<'tick>(cmp::max) -> tee(); + received_ballots = union() + -> map(Max::new) + -> lattice_merge::<'static, Max>() + -> map(|lattice: Max| lattice.0) + -> tee(); received_ballots[0] -> [0]has_largest_ballot; ballot[0] -> [1]has_largest_ballot; has_largest_ballot = cross_join::<'tick>() -> filter_map(|(max_received_ballot, ballot): (Ballot, Ballot)| { @@ -133,19 +143,30 @@ pub(crate) async fn run_proposer( ballot[1] -> [1]leader_and_resend_timeout_ballot; leader_and_resend_timeout_ballot = cross_join::<'tick>() -> map(|(_, b): ((), Ballot)| IAmLeader {ballot: b}) -> [0]leader_send; - i_am_leader_check = source_stream(i_am_leader_check_trigger) -> map(|_| ()) -> inspect(|_| println!("I am leader check triggered")) -> tee(); - - // track previous heartbeat, clear whenever i_am_leader_check_trigger triggers (LRU clock style) - prev_heartbeat = union() -> reduce::<'tick>(|_,_| ()) -> [pos]latest_heartbeat; - i_am_leader_check[0] -> [neg]latest_heartbeat; - latest_heartbeat = difference::<'tick, 'tick>() -> tee(); - latest_heartbeat[0] -> next_tick() -> [0]prev_heartbeat; - leader_recv[1] -> map(|_| ()) -> [1]prev_heartbeat; + // track latest heartbeat + source_iter(vec![0]) -> [0]latest_heartbeat; + latest_heartbeat = union() + -> map(Max::new) + -> lattice_merge::<'static, Max>() + -> map(|lattice: Max| lattice.0); + leader_recv[1] -> map(|_| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()) -> [1]latest_heartbeat; // if there was no previous heartbeat when the i_am_leader_check triggers again, send p1a - i_am_leader_check[1] -> [pos]p1a_trigger_and_no_heartbeat; - latest_heartbeat[1] -> [neg]p1a_trigger_and_no_heartbeat; - p1a_trigger_and_no_heartbeat = difference::<'tick, 'tick>() -> inspect(|_| println!("p1a trigger and no heartbeat")); + i_am_leader_check = source_stream(i_am_leader_check_trigger) + -> map(|_| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()) + -> inspect(|_| println!("I am leader check triggered")) + -> [0]p1a_trigger_and_no_heartbeat; + latest_heartbeat -> [1]p1a_trigger_and_no_heartbeat; + p1a_trigger_and_no_heartbeat = cross_join::<'tick>() + -> filter_map(|(check_time, heartbeat): (u64, u64)| { + if heartbeat + u64::from(i_am_leader_check_timeout) < check_time { + Some(()) + } + else { + None + } + }) + -> inspect(|_| println!("p1a trigger and no heartbeat")); p1a_trigger_and_no_heartbeat -> [pos]leader_expired; is_leader[1] -> [neg]leader_expired; leader_expired = difference::<'tick, 'tick>() -> inspect(|_| println!("leader expired")); @@ -231,23 +252,34 @@ pub(crate) async fn run_proposer( leader_and_reproposing_and_max_slot = cross_join::<'tick>() -> map(|((), s): ((), u16)| s + 1) -> next_tick() + -> [0]new_slots; + ballot[5] -> [0]ballot_and_next_slot; + new_slots = union() -> [1]ballot_and_next_slot; + // store the next slot for each ballot. Can discard slots for older ballots, so use DomPair lattice + ballot_and_next_slot = cross_join::<'tick>() + -> map(|(ballot, slot): (Ballot, u16)| DomPair::new(Max::new(ballot), Max::new(slot))) + -> lattice_merge::<'static, DomPair, Max>>() + -> map(|lattice: DomPair, Max>| (lattice.key.0, lattice.val.0)) -> [0]next_slot; - next_slot = union() -> tee(); + ballot[6] -> map(|b: Ballot| (b, ())) -> [1]next_slot; + // find the next slot for the current ballot + next_slot = join::<'tick>() + -> map(|(ballot, (slot, ())): (Ballot, (u16, ()))| slot) + -> tee(); /////////////////////////////////////////////////////////////////////// send p2as // assign a slot - indexed_payloads = client_in -> enumerate::<'tick>() -> inspect(|(i, m)| println!("Enumerated input: {:?} {:?}", i, m)); + indexed_payloads = client_in -> enumerate::<'tick>(); is_leader[3] -> [0]leader_and_slot; next_slot[1] -> [1]leader_and_slot; leader_and_slot = cross_join::<'tick>() -> map(|((), s): ((), u16)| s); leader_and_slot -> [0]leader_and_slot_and_ballot; - ballot[5] -> [1]leader_and_slot_and_ballot; + ballot[7] -> [1]leader_and_slot_and_ballot; leader_and_slot_and_ballot = cross_join::<'tick>(); // type: (slot, ballot) leader_and_slot_and_ballot -> [0]leader_and_slot_and_ballot_and_payload; indexed_payloads -> [1]leader_and_slot_and_ballot_and_payload; leader_and_slot_and_ballot_and_payload = cross_join::<'tick>() // type: ((slot, ballot), (index, payload)) - -> inspect(|v| println!("Cross joined: {:?}", v)) -> map(|((slot, ballot), (index, payload)): ((u16, Ballot), (u16, String))| P2a { entry: Entry { payload, @@ -258,27 +290,20 @@ pub(crate) async fn run_proposer( leader_and_slot_and_ballot_and_payload[0] -> [2]p2a; // increment the slot if a payload was chosen - num_payloads = leader_and_slot_and_ballot_and_payload[1] -> fold::<'tick>(0, |mut sum: u16, _| sum + 1) -> tee(); + num_payloads = leader_and_slot_and_ballot_and_payload[1] + -> fold::<'tick>(0, |mut sum: u16, _| sum + 1) + -> filter(|s: &u16| *s > 0); // avoid emitting 0, even if it's correct, since it will trigger the calculation of a new slot each tick next_slot[2] -> [0]slot_plus_num_indexed; - num_payloads[0] -> [1]slot_plus_num_indexed; + num_payloads -> [1]slot_plus_num_indexed; slot_plus_num_indexed = cross_join::<'tick>() -> map(|(slot, num): (u16, u16)| slot + num) -> next_tick() - -> [1]next_slot; - - // don't increment the slot if no payload was chosen, but we are still the leader - next_slot[3] -> map(|s| ((), s)) -> [pos]slot_plus_none_indexed; - num_payloads[1] -> filter_map(|num: u16| if num > 0 {Some(())} else {None}) -> [neg]slot_plus_none_indexed; - slot_plus_none_indexed = anti_join() - -> map(|((), s): ((), u16)| s) - -> next_tick() - -> [2]next_slot; - + -> [1]new_slots; /////////////////////////////////////////////////////////////////////// process p2bs - all_commit = p2b[1] -> map(|m: P2b| (m.entry, 1)) + all_commit = p2b[1] -> map(|m: P2b| (m, 1)) -> reduce_keyed::<'tick>(|sum: &mut u16, _| *sum += 1) - -> filter_map(|(m, num): (Entry, u16)| { + -> filter_map(|(m, num): (P2b, u16)| { if num == 2*f+1 { // only count when all acceptors have accepted, so we avoid duplicate outputs (when f+1, then f+2, etc accept) Some(m) } @@ -286,13 +311,8 @@ pub(crate) async fn run_proposer( None } }) - -> tee(); - all_commit[0] -> for_each(|e: Entry| println!("Committed slot {:?}: {:?}", e.slot, e.payload)); - p2b[2] -> map(|m: P2b| (m.entry.slot, m)) -> [pos]uncommitted_slots; - all_commit[1] -> map(|e: Entry| e.slot) -> [neg]uncommitted_slots; - // persist p2bs for uncommitted slots - uncommitted_slots = anti_join() - -> map(|(_, m): (u16, P2b)| m) + -> inspect(|m: &P2b| println!("Committed slot {:?}: {:?}", m.entry.slot, m.entry.payload)) + -> map(|m: P2b| Delete(m)) // delete p2bs for committed slots -> next_tick() -> [1]p2b; }; diff --git a/hydroflow/examples/paxos/protocol.rs b/hydroflow/examples/paxos/protocol.rs index 0165d0641398..327db581a178 100644 --- a/hydroflow/examples/paxos/protocol.rs +++ b/hydroflow/examples/paxos/protocol.rs @@ -42,7 +42,7 @@ pub struct P2a { pub entry: Entry, } -#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)] +#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash)] pub struct P2b { pub entry: Entry, pub max_ballot: Ballot,