Skip to content

Commit

Permalink
refactor(hydroflow_plus)!: move self_id and members to be APIs on…
Browse files Browse the repository at this point in the history
… cluster instead of builder (#1468)
  • Loading branch information
shadaj authored Sep 30, 2024
1 parent 08c2af5 commit 8ad997b
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 42 deletions.
4 changes: 2 additions & 2 deletions docs/docs/hydroflow_plus/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ Elements in a cluster are identified by a **cluster ID** (a `u32`). To get the I

This can then be passed into `source_iter` to load the IDs into the graph.
```rust
let stream = flow.source_iter(&process, flow.cluster_members(&cluster)).cloned();
let stream = flow.source_iter(&process, cluster.members()).cloned();
```

### One-to-Many
When sending data from a process to a cluster, the source must be a stream of tuples of the form `(u32, T)` and sends each `T` element to the instance with the matching `u32` ID.

This is useful for partitioning data across instances. For example, we can partition a stream of elements in a round-robin fashion by using `enumerate` to add a sequence number to each element, then using `send_bincode` to send each element to the instance with the matching sequence number:
```rust
let cluster_ids = flow.cluster_members(&cluster);
let cluster_ids = cluster.members();
let stream = flow.source_iter(&process, q!(vec![123, 456, 789]))
.enumerate()
.map(q!(|(i, x)| (
Expand Down
23 changes: 3 additions & 20 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ impl<'a> FreeVariable<&'a Vec<u32>> for ClusterIds<'a> {
impl<'a> Quoted<'a, &'a Vec<u32>> for ClusterIds<'a> {}

#[derive(Copy, Clone)]
struct ClusterSelfId<'a> {
id: usize,
_phantom: PhantomData<&'a mut &'a u32>,
pub(crate) struct ClusterSelfId<'a> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a u32>,
}

impl<'a> FreeVariable<u32> for ClusterSelfId<'a> {
Expand Down Expand Up @@ -200,23 +200,6 @@ impl<'a> FlowBuilder<'a> {
}
}

pub fn cluster_members<C>(
&self,
cluster: &Cluster<C>,
) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
ClusterIds {
id: cluster.id,
_phantom: PhantomData,
}
}

pub fn cluster_self_id<C>(&self, cluster: &Cluster<C>) -> impl Quoted<'a, u32> + Copy + 'a {
ClusterSelfId {
id: cluster.id,
_phantom: PhantomData,
}
}

pub fn spin<L: Location>(&self, on: &L) -> Stream<'a, (), Unbounded, NoTick, L> {
Stream::new(
on.id(),
Expand Down
19 changes: 19 additions & 0 deletions hydroflow_plus/src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use std::marker::PhantomData;

use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::Quoted;

use super::builder::{ClusterIds, ClusterSelfId};

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum LocationId {
Expand Down Expand Up @@ -76,6 +79,22 @@ pub struct Cluster<C> {
pub(crate) _phantom: PhantomData<C>,
}

impl<C> Cluster<C> {
pub fn self_id<'a>(&self) -> impl Quoted<'a, u32> + Copy + 'a {
ClusterSelfId {
id: self.id,
_phantom: PhantomData,
}
}

pub fn members<'a>(&self) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
ClusterIds {
id: self.id,
_phantom: PhantomData,
}
}
}

impl<C> Clone for Cluster<C> {
fn clone(&self) -> Self {
Cluster {
Expand Down
12 changes: 3 additions & 9 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde::Serialize;
use stageleft::{q, IntoQuotedMut, Quoted};
use syn::parse_quote;

use crate::builder::{ClusterIds, FlowLeaves};
use crate::builder::FlowLeaves;
use crate::cycle::{CycleCollection, CycleComplete};
use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{
Expand Down Expand Up @@ -856,10 +856,7 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> {
N: CanSend<Cluster<C2>, In<T> = (u32, T)>,
T: Clone + Serialize + DeserializeOwned,
{
let ids = ClusterIds::<'a> {
id: other.id,
_phantom: PhantomData,
};
let ids = other.members();

self.flat_map(q!(|b| ids.iter().map(move |id| (
::std::clone::Clone::clone(id),
Expand Down Expand Up @@ -887,10 +884,7 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> {
N: CanSend<Cluster<C2>, In<Bytes> = (u32, T)> + 'a,
T: Clone,
{
let ids = ClusterIds::<'a> {
id: other.id,
_phantom: PhantomData,
};
let ids = other.members();

self.flat_map(q!(|b| ids.iter().map(move |id| (
::std::clone::Clone::clone(id),
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus_test/src/cluster/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub fn map_reduce(flow: &FlowBuilder) -> (Process<Leader>, Cluster<Worker>) {
.source_iter(&process, q!(vec!["abc", "abc", "xyz", "abc"]))
.map(q!(|s| s.to_string()));

let all_ids_vec = flow.cluster_members(&cluster);
let all_ids_vec = cluster.members();
let words_partitioned = words
.tick_batch()
.enumerate()
Expand Down
12 changes: 5 additions & 7 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub fn paxos_core<'a, P: PaxosPayload>(
// Proposers.
flow.source_iter(&proposers, q!(["Proposers say hello"]))
.for_each(q!(|s| println!("{}", s)));
let p_id = flow.cluster_self_id(&proposers);
let p_id = proposers.self_id();

let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader) =
flow.cycle::<Stream<_, _, _, _>>(&proposers);
Expand All @@ -119,7 +119,6 @@ pub fn paxos_core<'a, P: PaxosPayload>(
p_ballot_calc(flow, &proposers, p_received_max_ballot.latest_tick());

let (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) = p_p1b(
flow,
&proposers,
a_to_proposers_p1b.inspect(q!(|(_, p1b)| println!("Proposer received P1b: {:?}", p1b))),
p_ballot_num.clone(),
Expand Down Expand Up @@ -413,7 +412,7 @@ fn p_p2a<'a, P: PaxosPayload>(
Optional<'a, i32, Bounded, Tick, Cluster<Proposer>>,
Stream<'a, P2a<P>, Unbounded, NoTick, Cluster<Acceptor>>,
) {
let p_id = flow.cluster_self_id(proposers);
let p_id = proposers.self_id();
let (p_next_slot_complete_cycle, p_next_slot) =
flow.tick_cycle::<Optional<i32, _, _, _>>(proposers);
let p_next_slot_after_reconciling_p1bs = p_max_slot
Expand Down Expand Up @@ -473,7 +472,6 @@ fn p_p2a<'a, P: PaxosPayload>(
// Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn p_p1b<'a, P: PaxosPayload>(
flow: &FlowBuilder<'a>,
proposers: &Cluster<Proposer>,
a_to_proposers_p1b: Stream<'a, (u32, P1b<P>), Unbounded, NoTick, Cluster<Proposer>>,
p_ballot_num: Singleton<'a, u32, Bounded, Tick, Cluster<Proposer>>,
Expand All @@ -485,7 +483,7 @@ fn p_p1b<'a, P: PaxosPayload>(
Optional<'a, i32, Bounded, Tick, Cluster<Proposer>>,
Stream<'a, P2a<P>, Bounded, Tick, Cluster<Proposer>>,
) {
let p_id = flow.cluster_self_id(proposers);
let p_id = proposers.self_id();
let p_relevant_p1bs = a_to_proposers_p1b
.clone()
.tick_prefix()
Expand Down Expand Up @@ -598,7 +596,7 @@ fn p_ballot_calc<'a>(
Singleton<'a, u32, Bounded, Tick, Cluster<Proposer>>,
Optional<'a, (Ballot, u32), Bounded, Tick, Cluster<Proposer>>,
) {
let p_id = flow.cluster_self_id(proposers);
let p_id = proposers.self_id();
let (p_ballot_num_complete_cycle, p_ballot_num) =
flow.tick_cycle_with_initial(proposers, flow.singleton(proposers, q!(0)).latest_tick());

Expand Down Expand Up @@ -654,7 +652,7 @@ fn p_p1a<'a>(
Stream<'a, Ballot, Unbounded, NoTick, Cluster<Proposer>>,
Stream<'a, P1a, Unbounded, NoTick, Cluster<Acceptor>>,
) {
let p_id = flow.cluster_self_id(proposers);
let p_id = proposers.self_id();
let p_to_proposers_i_am_leader_new = p_ballot_num
.clone()
.continue_if(
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ fn bench_client<'a, B: LeaderElected + std::fmt::Debug>(
median_latency_window_size: usize,
f: usize,
) -> Stream<'a, (u32, ClientPayload), Unbounded, NoTick, Cluster<Client>> {
let c_id = flow.cluster_self_id(clients);
let c_id = clients.self_id();
// r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload)));
// Only keep the latest leader
let c_max_leader_ballot = p_to_clients_leader_elected
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_plus_test/src/cluster/simple_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ pub fn simple_cluster(flow: &FlowBuilder) -> (Process<()>, Cluster<()>) {

let numbers = flow.source_iter(&process, q!(0..5));
let ids = flow
.source_iter(&process, flow.cluster_members(&cluster))
.source_iter(&process, cluster.members())
.map(q!(|&id| id));

let cluster_self_id = flow.cluster_self_id(&cluster);
let cluster_self_id = cluster.self_id();

ids.cross_product(numbers)
.map(q!(|(id, n)| (id, (id, n))))
Expand Down

0 comments on commit 8ad997b

Please sign in to comment.