Skip to content

Commit

Permalink
Use lattices and persist_mut to prevent spinning.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Jun 27, 2023
1 parent ef1c100 commit ea426c1
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 41 deletions.
100 changes: 60 additions & 40 deletions hydroflow/examples/paxos/proposer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::cmp;
use std::net::SocketAddr;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};

use futures::join;
use hydroflow::compiled::pull::HalfMultisetJoinState;
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{UdpSink, UdpStream, bind_udp_bytes};
use hydroflow::util::{UdpSink, UdpStream, bind_udp_bytes, Persistence::*};

Check warning on line 10 in hydroflow/examples/paxos/proposer.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused imports: `UdpSink`, `UdpStream`

Check warning on line 10 in hydroflow/examples/paxos/proposer.rs

View workflow job for this annotation

GitHub Actions / Test Suite (pinned-nightly)

unused imports: `UdpSink`, `UdpStream`
use hydroflow::lattices::{Max, DomPair};

use crate::helpers::{periodic, get_phase_1_addr, get_phase_2_addr, get_config};
use crate::protocol::*;
Expand Down Expand Up @@ -93,17 +95,21 @@ pub(crate) async fn run_proposer(
p2b_in = source_stream_serde(p2b_src)
-> map(Result::unwrap)
-> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a))
-> map(|(m, _)| m)
-> map(|(m, _)| Persist(m))
-> [0]p2b;
p2b = union()
-> persist_mut()
-> tee();

client_in = source_stdin()
-> filter_map(|m: Result<String, std::io::Error>| m.ok())
-> inspect(|m| println!("Client input: {:?}", m));

// compute ballot (default ballot num = 0)
ballot_num = union() -> persist() -> reduce::<'tick>(cmp::max);
ballot_num = union()
-> map(Max::new)
-> lattice_merge::<'static, Max<u16>>()
-> map(|lattice: Max<u16>| lattice.0);
source_iter(vec![start_ballot_num]) -> [0]ballot_num;
ballot = ballot_num -> map(|num| Ballot {id, num}) -> tee();

Expand All @@ -113,7 +119,11 @@ pub(crate) async fn run_proposer(
leader_recv[0] -> map(|m: IAmLeader| m.ballot) -> received_ballots[0];
p1b[0] -> map(|m: P1b| m.max_ballot) -> received_ballots[1];
p2b[0] -> map(|m: P2b| m.max_ballot) -> received_ballots[2];
received_ballots = union() -> persist() -> reduce::<'tick>(cmp::max) -> tee();
received_ballots = union()
-> map(Max::new)
-> lattice_merge::<'static, Max<Ballot>>()
-> map(|lattice: Max<Ballot>| lattice.0)
-> tee();
received_ballots[0] -> [0]has_largest_ballot;
ballot[0] -> [1]has_largest_ballot;
has_largest_ballot = cross_join::<'tick>() -> filter_map(|(max_received_ballot, ballot): (Ballot, Ballot)| {
Expand All @@ -133,19 +143,30 @@ pub(crate) async fn run_proposer(
ballot[1] -> [1]leader_and_resend_timeout_ballot;
leader_and_resend_timeout_ballot = cross_join::<'tick>() -> map(|(_, b): ((), Ballot)| IAmLeader {ballot: b}) -> [0]leader_send;

i_am_leader_check = source_stream(i_am_leader_check_trigger) -> map(|_| ()) -> inspect(|_| println!("I am leader check triggered")) -> tee();

// track previous heartbeat, clear whenever i_am_leader_check_trigger triggers (LRU clock style)
prev_heartbeat = union() -> reduce::<'tick>(|_,_| ()) -> [pos]latest_heartbeat;
i_am_leader_check[0] -> [neg]latest_heartbeat;
latest_heartbeat = difference::<'tick, 'tick>() -> tee();
latest_heartbeat[0] -> next_tick() -> [0]prev_heartbeat;
leader_recv[1] -> map(|_| ()) -> [1]prev_heartbeat;
// track latest heartbeat
source_iter(vec![0]) -> [0]latest_heartbeat;
latest_heartbeat = union()
-> map(Max::new)
-> lattice_merge::<'static, Max<u64>>()
-> map(|lattice: Max<u64>| lattice.0);
leader_recv[1] -> map(|_| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()) -> [1]latest_heartbeat;

// if there was no previous heartbeat when the i_am_leader_check triggers again, send p1a
i_am_leader_check[1] -> [pos]p1a_trigger_and_no_heartbeat;
latest_heartbeat[1] -> [neg]p1a_trigger_and_no_heartbeat;
p1a_trigger_and_no_heartbeat = difference::<'tick, 'tick>() -> inspect(|_| println!("p1a trigger and no heartbeat"));
i_am_leader_check = source_stream(i_am_leader_check_trigger)
-> map(|_| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs())
-> inspect(|_| println!("I am leader check triggered"))
-> [0]p1a_trigger_and_no_heartbeat;
latest_heartbeat -> [1]p1a_trigger_and_no_heartbeat;
p1a_trigger_and_no_heartbeat = cross_join::<'tick>()
-> filter_map(|(check_time, heartbeat): (u64, u64)| {
if heartbeat + u64::from(i_am_leader_check_timeout) < check_time {
Some(())
}
else {
None
}
})
-> inspect(|_| println!("p1a trigger and no heartbeat"));
p1a_trigger_and_no_heartbeat -> [pos]leader_expired;
is_leader[1] -> [neg]leader_expired;
leader_expired = difference::<'tick, 'tick>() -> inspect(|_| println!("leader expired"));
Expand Down Expand Up @@ -231,23 +252,34 @@ pub(crate) async fn run_proposer(
leader_and_reproposing_and_max_slot = cross_join::<'tick>()
-> map(|((), s): ((), u16)| s + 1)
-> next_tick()
-> [0]new_slots;
ballot[5] -> [0]ballot_and_next_slot;
new_slots = union() -> [1]ballot_and_next_slot;
// store the next slot for each ballot. Can discard slots for older ballots, so use DomPair lattice
ballot_and_next_slot = cross_join::<'tick>()
-> map(|(ballot, slot): (Ballot, u16)| DomPair::new(Max::new(ballot), Max::new(slot)))
-> lattice_merge::<'static, DomPair<Max<Ballot>, Max<u16>>>()
-> map(|lattice: DomPair<Max<Ballot>, Max<u16>>| (lattice.key.0, lattice.val.0))
-> [0]next_slot;
next_slot = union() -> tee();
ballot[6] -> map(|b: Ballot| (b, ())) -> [1]next_slot;
// find the next slot for the current ballot
next_slot = join::<'tick>()
-> map(|(ballot, (slot, ())): (Ballot, (u16, ()))| slot)

Check warning on line 267 in hydroflow/examples/paxos/proposer.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused variable: `ballot`

Check warning on line 267 in hydroflow/examples/paxos/proposer.rs

View workflow job for this annotation

GitHub Actions / Test Suite (pinned-nightly)

unused variable: `ballot`
-> tee();


/////////////////////////////////////////////////////////////////////// send p2as
// assign a slot
indexed_payloads = client_in -> enumerate::<'tick>() -> inspect(|(i, m)| println!("Enumerated input: {:?} {:?}", i, m));
indexed_payloads = client_in -> enumerate::<'tick>();
is_leader[3] -> [0]leader_and_slot;
next_slot[1] -> [1]leader_and_slot;
leader_and_slot = cross_join::<'tick>() -> map(|((), s): ((), u16)| s);
leader_and_slot -> [0]leader_and_slot_and_ballot;
ballot[5] -> [1]leader_and_slot_and_ballot;
ballot[7] -> [1]leader_and_slot_and_ballot;
leader_and_slot_and_ballot = cross_join::<'tick>(); // type: (slot, ballot)
leader_and_slot_and_ballot -> [0]leader_and_slot_and_ballot_and_payload;
indexed_payloads -> [1]leader_and_slot_and_ballot_and_payload;
leader_and_slot_and_ballot_and_payload = cross_join::<'tick>() // type: ((slot, ballot), (index, payload))
-> inspect(|v| println!("Cross joined: {:?}", v))
-> map(|((slot, ballot), (index, payload)): ((u16, Ballot), (u16, String))| P2a {
entry: Entry {
payload,
Expand All @@ -258,41 +290,29 @@ pub(crate) async fn run_proposer(
leader_and_slot_and_ballot_and_payload[0] -> [2]p2a;

// increment the slot if a payload was chosen
num_payloads = leader_and_slot_and_ballot_and_payload[1] -> fold::<'tick>(0, |mut sum: u16, _| sum + 1) -> tee();
num_payloads = leader_and_slot_and_ballot_and_payload[1]
-> fold::<'tick>(0, |mut sum: u16, _| sum + 1)
-> filter(|s: &u16| *s > 0); // avoid emitting 0, even if it's correct, since it will trigger the calculation of a new slot each tick
next_slot[2] -> [0]slot_plus_num_indexed;
num_payloads[0] -> [1]slot_plus_num_indexed;
num_payloads -> [1]slot_plus_num_indexed;
slot_plus_num_indexed = cross_join::<'tick>()
-> map(|(slot, num): (u16, u16)| slot + num)
-> next_tick()
-> [1]next_slot;

// don't increment the slot if no payload was chosen, but we are still the leader
next_slot[3] -> map(|s| ((), s)) -> [pos]slot_plus_none_indexed;
num_payloads[1] -> filter_map(|num: u16| if num > 0 {Some(())} else {None}) -> [neg]slot_plus_none_indexed;
slot_plus_none_indexed = anti_join()
-> map(|((), s): ((), u16)| s)
-> next_tick()
-> [2]next_slot;

-> [1]new_slots;

/////////////////////////////////////////////////////////////////////// process p2bs
all_commit = p2b[1] -> map(|m: P2b| (m.entry, 1))
all_commit = p2b[1] -> map(|m: P2b| (m, 1))
-> reduce_keyed::<'tick>(|sum: &mut u16, _| *sum += 1)
-> filter_map(|(m, num): (Entry, u16)| {
-> filter_map(|(m, num): (P2b, u16)| {
if num == 2*f+1 { // only count when all acceptors have accepted, so we avoid duplicate outputs (when f+1, then f+2, etc accept)
Some(m)
}
else {
None
}
})
-> tee();
all_commit[0] -> for_each(|e: Entry| println!("Committed slot {:?}: {:?}", e.slot, e.payload));
p2b[2] -> map(|m: P2b| (m.entry.slot, m)) -> [pos]uncommitted_slots;
all_commit[1] -> map(|e: Entry| e.slot) -> [neg]uncommitted_slots;
// persist p2bs for uncommitted slots
uncommitted_slots = anti_join()
-> map(|(_, m): (u16, P2b)| m)
-> inspect(|m: &P2b| println!("Committed slot {:?}: {:?}", m.entry.slot, m.entry.payload))
-> map(|m: P2b| Delete(m)) // delete p2bs for committed slots
-> next_tick()
-> [1]p2b;
};
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/paxos/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct P2a {
pub entry: Entry,
}

#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug)]
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash)]
pub struct P2b {
pub entry: Entry,
pub max_ballot: Ballot,
Expand Down

0 comments on commit ea426c1

Please sign in to comment.