Skip to content

Commit

Permalink
Format
Browse files Browse the repository at this point in the history
  • Loading branch information
davidchuyaya committed Jun 27, 2023
1 parent ea426c1 commit 34eb1ea
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 51 deletions.
17 changes: 6 additions & 11 deletions hydroflow/examples/paxos/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,19 @@ use std::net::SocketAddr;
use futures::join;
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{bind_udp_bytes};
use hydroflow::util::bind_udp_bytes;

use crate::helpers::{get_phase_1_addr, get_phase_2_addr};
use crate::protocol::*;
use crate::{GraphType};
use crate::GraphType;

pub(crate) async fn run_acceptor(
addr: SocketAddr,
graph: Option<GraphType>,
) {
pub(crate) async fn run_acceptor(addr: SocketAddr, graph: Option<GraphType>) {
let phase_1_addr = get_phase_1_addr(addr);
let phase_2_addr = get_phase_2_addr(addr);

let p1a_future = bind_udp_bytes(phase_1_addr);
let p2a_future = bind_udp_bytes(phase_2_addr);
let ((p1b_sink, p1a_src, _),
(p2b_sink, p2a_src, _))
= join!(p1a_future, p2a_future);
let ((p1b_sink, p1a_src, _), (p2b_sink, p2a_src, _)) = join!(p1a_future, p2a_future);

let mut df: Hydroflow = hydroflow_syntax! {
// define inputs/outputs
Expand All @@ -30,7 +25,7 @@ pub(crate) async fn run_acceptor(
-> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a))
-> tee();
p1b = cross_join::<'tick>()
-> map(|(((p1a, proposer), log), max_ballot): (((P1a, SocketAddr), HashSet<Entry>), Ballot)|
-> map(|(((p1a, proposer), log), max_ballot): (((P1a, SocketAddr), HashSet<Entry>), Ballot)|
(P1b {
ballot: p1a.ballot,
max_ballot,
Expand Down Expand Up @@ -74,7 +69,7 @@ pub(crate) async fn run_acceptor(
log.insert(e);
log
});

// reply with p1b
p1a_and_log = cross_join::<'tick>() -> [0]p1b;
p1a[1] -> [0]p1a_and_log;
Expand Down
7 changes: 5 additions & 2 deletions hydroflow/examples/paxos/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{net::SocketAddr, path::Path, fs::File, io::BufReader};
use std::fs::File;
use std::io::BufReader;
use std::net::SocketAddr;
use std::path::Path;

use tokio_stream::wrappers::IntervalStream;

Expand Down Expand Up @@ -30,4 +33,4 @@ pub fn get_config(path: impl AsRef<Path>) -> Config {
let file = File::open(path).unwrap();
let reader = BufReader::new(file);
serde_json::from_reader(reader).unwrap()
}
}
10 changes: 5 additions & 5 deletions hydroflow/examples/paxos/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use std::net::SocketAddr;
use std::path::Path;

use acceptor::run_acceptor;
use clap::{Parser, ValueEnum};
use proposer::run_proposer;
use hydroflow::tokio;
use hydroflow::util::{ipv4_resolve};
use hydroflow::util::ipv4_resolve;
use proposer::run_proposer;
use serde::Deserialize;
use acceptor::run_acceptor;

mod proposer;
mod acceptor;
mod helpers;
mod proposer;
mod protocol;
mod acceptor;

#[derive(Clone, ValueEnum, Debug)]
enum Role {
Expand Down
32 changes: 19 additions & 13 deletions hydroflow/examples/paxos/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use std::time::{SystemTime, UNIX_EPOCH};
use futures::join;
use hydroflow::compiled::pull::HalfMultisetJoinState;
use hydroflow::hydroflow_syntax;
use hydroflow::lattices::{DomPair, Max};
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{UdpSink, UdpStream, bind_udp_bytes, Persistence::*};
use hydroflow::lattices::{Max, DomPair};
use hydroflow::util::Persistence::*;
use hydroflow::util::{bind_udp_bytes, UdpSink, UdpStream};

use crate::helpers::{periodic, get_phase_1_addr, get_phase_2_addr, get_config};
use crate::helpers::{get_config, get_phase_1_addr, get_phase_2_addr, periodic};
use crate::protocol::*;
use crate::{Config, GraphType};

Check warning on line 16 in hydroflow/examples/paxos/proposer.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused import: `Config`

Check warning on line 16 in hydroflow/examples/paxos/proposer.rs

View workflow job for this annotation

GitHub Actions / Test Suite (pinned-nightly)

unused import: `Config`

Expand All @@ -31,20 +32,25 @@ pub(crate) async fn run_proposer(
let leader_future = bind_udp_bytes(stable_leader_addr);
let p1a_future = bind_udp_bytes(phase_1_addr);
let p2a_future = bind_udp_bytes(phase_2_addr);
let ((stable_leader_sink, stable_leader_src, _),
let (
(stable_leader_sink, stable_leader_src, _),
(p1a_sink, p1b_src, _),
(p2a_sink, p2b_src, _))
= join!(leader_future, p1a_future, p2a_future);
(p2a_sink, p2b_src, _),
) = join!(leader_future, p1a_future, p2a_future);

let id = addr.port(); //TODO assumes that each proposer has a different port. True locally, but may not be true in distributed setting
let id = addr.port(); // TODO assumes that each proposer has a different port. True locally, but may not be true in distributed setting
let config = get_config(path);
let f = config.f;

// create timeout triggers
let is_node_0 = addr == config.proposers[0].parse::<SocketAddr>().unwrap();
println!("is_node_0: {:?}", is_node_0);
let i_am_leader_resend_trigger = periodic(config.i_am_leader_resend_timeout);
let i_am_leader_check_timeout = if is_node_0 { config.i_am_leader_check_timeout_node_0} else { config.i_am_leader_check_timeout_other_nodes};
let i_am_leader_check_timeout = if is_node_0 {
config.i_am_leader_check_timeout_node_0
} else {
config.i_am_leader_check_timeout_other_nodes
};
let i_am_leader_check_trigger = periodic(i_am_leader_check_timeout);

let mut df: Hydroflow = hydroflow_syntax! {
Expand All @@ -61,7 +67,7 @@ pub(crate) async fn run_proposer(
}
})
-> inspect(|p| println!("Proposer: {:?}", p))
-> persist();
-> persist();
leader_recv = source_stream_serde(stable_leader_src)
-> map(Result::unwrap)
-> inspect(|(m, a)| println!("Received {:?} from {:?}", m, a))
Expand Down Expand Up @@ -134,7 +140,7 @@ pub(crate) async fn run_proposer(
None
}
}) -> tee();

// send heartbeat if we're the leader
source_stream(i_am_leader_resend_trigger) -> [0]leader_and_resend_timeout;
is_leader[0] -> [1]leader_and_resend_timeout;
Expand All @@ -150,7 +156,7 @@ pub(crate) async fn run_proposer(
-> lattice_merge::<'static, Max<u64>>()
-> map(|lattice: Max<u64>| lattice.0);
leader_recv[1] -> map(|_| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()) -> [1]latest_heartbeat;

// if there was no previous heartbeat when the i_am_leader_check triggers again, send p1a
i_am_leader_check = source_stream(i_am_leader_check_trigger)
-> map(|_| SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs())
Expand Down Expand Up @@ -246,7 +252,7 @@ pub(crate) async fn run_proposer(
ballot: b,
},
}) -> [1]p2a;

leader_and_reproposing[1] -> [0]leader_and_reproposing_and_max_slot;
max_proposed_slot[1] -> [1]leader_and_reproposing_and_max_slot;
leader_and_reproposing_and_max_slot = cross_join::<'tick>()
Expand Down Expand Up @@ -299,7 +305,7 @@ pub(crate) async fn run_proposer(
-> map(|(slot, num): (u16, u16)| slot + num)
-> next_tick()
-> [1]new_slots;

/////////////////////////////////////////////////////////////////////// process p2bs
all_commit = p2b[1] -> map(|m: P2b| (m, 1))
-> reduce_keyed::<'tick>(|sum: &mut u16, _| *sum += 1)
Expand Down
9 changes: 5 additions & 4 deletions hydroflow/examples/paxos/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{net::SocketAddr, collections::HashSet};
use std::collections::HashSet;
use std::net::SocketAddr;

Check warning on line 2 in hydroflow/examples/paxos/protocol.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused import: `std::net::SocketAddr`

Check warning on line 2 in hydroflow/examples/paxos/protocol.rs

View workflow job for this annotation

GitHub Actions / Test Suite (pinned-nightly)

unused import: `std::net::SocketAddr`

use serde::{Deserialize, Serialize};

#[derive(Ord, PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash, Copy)]
pub struct Ballot {
pub id: u16,
pub num: u16
pub num: u16,
}

impl PartialOrd for Ballot {
Expand Down Expand Up @@ -50,5 +51,5 @@ pub struct P2b {

#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash)]
pub struct IAmLeader {
pub ballot: Ballot
}
pub ballot: Ballot,
}
2 changes: 1 addition & 1 deletion hydroflow/examples/two_pc/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub(crate) async fn run_coordinator(
-> fold_keyed::<'static, u16, u32>(|| 0, |acc: &mut _, val| *acc += val);

// If ack_votes for this xid is the same as subord_total, send a End message
acked = join() -> map(|(_c, (xid, ()))| xid) -> tee();
acked = join() -> map(|(_c, (xid, ()))| xid) -> tee();
ack_votes -> map(|(xid, c)| (c, xid)) -> [0]acked;
subord_total[1] -> map(|c| (c, ())) -> [1]acked;
// Presumed abort: log ends (don't need to flush)
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/two_pc/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ pub fn decide(odds: u8) -> bool {
use std::net::SocketAddr;
pub fn get_output_file(addr: SocketAddr) -> String {
format!("{:?}.txt", addr.port())
}
}
24 changes: 11 additions & 13 deletions hydroflow/examples/two_pc/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
use serde::{Deserialize, Serialize};

/**
* Message pattern:
* User input on stdin
* Phase 1
* Coordinator sends Prepare
* Subordinate responds with Commit or Abort
* Coordinator collects, wait for all responses, logs Commit if all commit
* Phase 2
* Coordinator sends Commit after flushing log, or just sends Abort
* Subordinate responds with AckP2
* Coordinator collects, wait for all responses, sends End
* Subordinate responds with Ended
*/
/// Message pattern:
/// User input on stdin
/// Phase 1
/// Coordinator sends Prepare
/// Subordinate responds with Commit or Abort
/// Coordinator collects, wait for all responses, logs Commit if all commit
/// Phase 2
/// Coordinator sends Commit after flushing log, or just sends Abort
/// Subordinate responds with AckP2
/// Coordinator collects, wait for all responses, sends End
/// Subordinate responds with Ended
#[derive(PartialEq, Eq, Clone, Serialize, Deserialize, Debug, Hash, Copy)]
pub enum MsgType {
Prepare,
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/two_pc/subordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub(crate) async fn run_subordinate(
// handle p2 message: acknowledge (and print)
ack_p2_chan = msgs[p2] -> tee();
// Presumed abort: log commits/aborts (reply only after flushing to disk)
ack_p2_chan[0] -> map(|m:CoordMsg| format!("Phase 2 {:?}, {:?}", m.xid, m.mtype)) -> log_to_disk[1];
ack_p2_chan[0] -> map(|m:CoordMsg| format!("Phase 2 {:?}, {:?}", m.xid, m.mtype)) -> log_to_disk[1];
ack_p2_chan[1] -> map(|m:CoordMsg| SubordResponse {
xid: m.xid,
mtype: MsgType::AckP2,
Expand Down

0 comments on commit 34eb1ea

Please sign in to comment.