Skip to content

Commit

Permalink
refactor(hydroflow_plus)!: move input APIs back to being on locations
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Sep 30, 2024
1 parent 8ad997b commit d5558f5
Show file tree
Hide file tree
Showing 38 changed files with 1,226 additions and 1,202 deletions.
2 changes: 1 addition & 1 deletion docs/docs/hydroflow_plus/aggregations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ To specify this **window**, Hydroflow+ offers two operators, `tick_batch()` and
For example, consider a pipelined aggregation across two processes. We can sum up elements on the first process in a batched manner using `tick_batch()`, then sum up the results on the second process in an unbounded manner using `all_ticks()`:

```rust
let root_stream = flow.source_stream(&process, q!(1..=10));
let root_stream = process.source_stream(q!(1..=10));
root_stream
.tick_batch()
.fold(q!(|| 0), q!(|acc, x| *acc += x))
Expand Down
24 changes: 12 additions & 12 deletions docs/docs/hydroflow_plus/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ This API follows the same pattern as processes, where a cluster spec represents
Instantiating streams on clusters uses the same APIs as streams: `source_iter` and `source_stream` are both available. But when using these APIs, the root streams will be instantiated on _all_ instances in the cluster.

```rust
let stream = flow.source_iter(&cluster, q!(vec![1, 2, 3]));
let stream = cluster.source_iter(q!(vec![1, 2, 3]));

stream.for_each(q!(|x| println!("{}", x)))
// will print 1, 2, 3 on **each** instance
Expand All @@ -36,7 +36,7 @@ Elements in a cluster are identified by a **cluster ID** (a `u32`). To get the I

This can then be passed into `source_iter` to load the IDs into the graph.
```rust
let stream = flow.source_iter(&process, cluster.members()).cloned();
let stream = process.source_iter(cluster.members()).cloned();
```

### One-to-Many
Expand All @@ -45,36 +45,36 @@ When sending data from a process to a cluster, the source must be a stream of tu
This is useful for partitioning data across instances. For example, we can partition a stream of elements in a round-robin fashion by using `enumerate` to add a sequence number to each element, then using `send_bincode` to send each element to the instance with the matching sequence number:
```rust
let cluster_ids = cluster.members();
let stream = flow.source_iter(&process, q!(vec![123, 456, 789]))
let stream = process.source_iter(q!(vec![123, 456, 789]))
.enumerate()
.map(q!(|(i, x)| (
i % cluster_ids.len() as u32,
x
)))
.send_bincode(cluster);
.send_bincode(&cluster);
```

To broadcast data to all instances in a cluster, use `broadcast_{bincode,bytes}`, which acts as a shortcut for the cross product.

```rust
let stream = flow.source_iter(&process, q!(vec![123, 456, 789]))
.broadcast_bincode(cluster);
let stream = process.source_iter(q!(vec![123, 456, 789]))
.broadcast_bincode(&cluster);
```

### Many-to-One
In the other direction, sending data from a cluster to a process, we have a stream of elements of type `T` at the sender but on the recipient side we get a stream of tuples of the form `(u32, T)`, where the `u32` is the ID of the instance that sent the element. The elements received from different instances will be interleaved.

This is useful for aggregating data from multiple instances into a single stream. For example, we can use `send_bincode` to send data from all instances to a single process, and then print them all out:
```rust
let stream = flow.source_iter(&cluster, q!(vec![123, 456, 789]))
.send_bincode(process)
let stream = cluster.source_iter(q!(vec![123, 456, 789]))
.send_bincode(&process)
.for_each(q!(|(id, x)| println!("{}: {}", id, x)));
```

If you don't care which instance sent the data, you can use `send_{bincode,bytes}_interleaved`, where the recipient receives a stream of `T` elements, but the elements received from different instances will be interleaved.
```rust
let stream = flow.source_iter(&cluster, q!(vec![123, 456, 789]))
.send_bincode_interleaved(process)
let stream = cluster.source_iter(q!(vec![123, 456, 789]))
.send_bincode_interleaved(&process)
.for_each(q!(|x| println!("{}", x)));
```

Expand All @@ -83,7 +83,7 @@ Finally, when sending data from one cluster to another (or to itself as in distr

We can use the same shortcuts as before. For example, we can use `broadcast_bincode_interleaved` to send data from all instances in a cluster to all instances in another cluster, and then print them all out:
```rust
let stream = flow.source_iter(&cluster1, q!(vec![123, 456, 789]))
.broadcast_bincode_interleaved(cluster2)
let stream = cluster1.source_iter(q!(vec![123, 456, 789]))
.broadcast_bincode_interleaved(&cluster2)
.for_each(q!(|x| println!("{}", x)));
```
8 changes: 4 additions & 4 deletions docs/docs/hydroflow_plus/cycles.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ Because streams are represented as values when constructing a Hydroflow+ graph,
We can create a cycle by using the `cycle` method on flow with a process or cluster. This returns a tuple of two values: a `HfCycle` value that can be used to complete the cycle later and the placeholder stream.

```rust
let (complete_cycle, cycle_placeholder) = flow.cycle(&process);
let (complete_cycle, cycle_placeholder) = process.cycle();
```

For example, consider the classic graph reachability problem, which computes the nodes reachable from a given set of roots in a directed graph. This can be modeled as an iterative fixpoint computation where we start with the roots, then repeatedly add the children of each node to the set of reachable nodes until we reach a fixpoint.

In Hydroflow+, we can implement this using cycles:

```rust
let roots = flow.source_stream(&process, roots);
let edges = flow.source_stream(&process, edges);
let roots = process.source_stream(roots);
let edges = process.source_stream(edges);

let (complete_reached_nodes, reached_nodes) = flow.cycle(&process);
let (complete_reached_nodes, reached_nodes) = process.cycle();

let reach_iteration = roots
.union(&reached_nodes)
Expand Down
12 changes: 6 additions & 6 deletions docs/docs/hydroflow_plus/process_streams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Root streams are created using methods available on an an instantiated process.
To create a stream from a Rust iterator, use `source_iter`. This is useful for loading static data into the graph. Each element of the iterator will be emitted _exactly once_ in the _first tick_ of execution (see [Aggregations and Ticks](./aggregations.mdx)).

```rust
let stream = flow.source_iter(&process, q!(vec![1, 2, 3]));
let stream = process.source_iter(q!(vec![1, 2, 3]));
```

#### `source_stream`
Expand All @@ -52,7 +52,7 @@ pub fn my_flow<'a, D: Deploy<'a>>(
...,
my_stream: RuntimeData<impl Stream<Item = i32>>
) {
let stream = flow.source_stream(&process, my_stream);
let stream = process.source_stream(my_stream);
...
}
```
Expand All @@ -66,13 +66,13 @@ If sending a type that supports serialization using `serde`, use `send_bincode`,
let process0 = flow.process(process_spec);
let process1 = flow.process(process_spec);

let stream0 = flow.source_iter(&process0, ...);
let stream1 = stream0.send_bincode(process1);
let stream0 = process0.source_iter(...);
let stream1 = stream0.send_bincode(&process1);
```

To use custom serializers, you can use the `send_bytes` method to send a stream of `Bytes` values.

```rust
let stream0 = flow.source_iter(&process0, ...);
let stream1 = stream0.send_bytes(process1);
let stream0 = process0.source_iter(...);
let stream1 = stream0.send_bytes(&process1);
```
2 changes: 1 addition & 1 deletion docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn broadcast(
When sending data between individual processes, we used the `send_bincode` operator. When sending data from a process to a cluster, we can use the `broadcast_bincode` operator instead.

```rust
let data = flow.source_iter(&leader, q!(0..10));
let data = leader.source_iter(q!(0..10));
data
.broadcast_bincode(&workers)
.for_each(q!(|n| println!("{}", n)));
Expand Down
6 changes: 3 additions & 3 deletions hydroflow_plus/src/builder/built.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::location::{Cluster, ExternalProcess, Process};
use crate::HfCompiled;

pub struct BuiltFlow<'a> {
pub(super) ir: Vec<HfPlusLeaf<'a>>,
pub(super) ir: Vec<HfPlusLeaf>,
pub(super) processes: Vec<usize>,
pub(super) clusters: Vec<usize>,
pub(super) used: bool,
Expand All @@ -27,13 +27,13 @@ impl<'a> Drop for BuiltFlow<'a> {
}

impl<'a> BuiltFlow<'a> {
pub fn ir(&self) -> &Vec<HfPlusLeaf<'a>> {
pub fn ir(&self) -> &Vec<HfPlusLeaf> {
&self.ir
}

pub fn optimize_with(
mut self,
f: impl FnOnce(Vec<HfPlusLeaf<'a>>) -> Vec<HfPlusLeaf<'a>>,
f: impl FnOnce(Vec<HfPlusLeaf>) -> Vec<HfPlusLeaf>,
) -> BuiltFlow<'a> {
self.used = true;
BuiltFlow {
Expand Down
14 changes: 7 additions & 7 deletions hydroflow_plus/src/builder/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::location::{
use crate::{Cluster, ClusterSpec, Deploy, HfCompiled, Process, ProcessSpec};

pub struct DeployFlow<'a, D: LocalDeploy<'a>> {
pub(super) ir: Vec<HfPlusLeaf<'a>>,
pub(super) ir: Vec<HfPlusLeaf>,
pub(super) nodes: HashMap<usize, D::Process>,
pub(super) externals: HashMap<usize, D::ExternalProcess>,
pub(super) clusters: HashMap<usize, D::Cluster>,
Expand Down Expand Up @@ -69,7 +69,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
self.used = true;

let mut seen_tees: HashMap<_, _> = HashMap::new();
let mut ir_leaves_networked: Vec<HfPlusLeaf> = std::mem::take(&mut self.ir)
let mut flow_state_networked: Vec<HfPlusLeaf> = std::mem::take(&mut self.ir)
.into_iter()
.map(|leaf| {
leaf.compile_network::<D>(
Expand All @@ -85,7 +85,7 @@ impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
let extra_stmts = self.extra_stmts(env);

HfCompiled {
hydroflow_ir: build_inner(&mut ir_leaves_networked),
hydroflow_ir: build_inner(&mut flow_state_networked),
extra_stmts,
_phantom: PhantomData,
}
Expand Down Expand Up @@ -132,7 +132,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> {
self.used = true;

let mut seen_tees_instantiate: HashMap<_, _> = HashMap::new();
let mut ir_leaves_networked: Vec<HfPlusLeaf> = std::mem::take(&mut self.ir)
let mut flow_state_networked: Vec<HfPlusLeaf> = std::mem::take(&mut self.ir)
.into_iter()
.map(|leaf| {
leaf.compile_network::<D>(
Expand All @@ -145,7 +145,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> {
})
.collect();

let mut compiled = build_inner(&mut ir_leaves_networked);
let mut compiled = build_inner(&mut flow_state_networked);
let mut extra_stmts = self.extra_stmts(&());
let mut meta = D::Meta::default();

Expand Down Expand Up @@ -201,7 +201,7 @@ impl<'a, D: Deploy<'a, CompileEnv = ()>> DeployFlow<'a, D> {
}

let mut seen_tees_connect = HashMap::new();
for leaf in ir_leaves_networked {
for leaf in flow_state_networked {
leaf.connect_network(&mut seen_tees_connect);
}

Expand Down Expand Up @@ -229,7 +229,7 @@ impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
self.processes.get(&id).unwrap()
}

pub fn get_cluster<C>(&self, c: &Cluster<C>) -> &D::Cluster {
pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
let id = match c.id() {
LocationId::Cluster(id) => id,
_ => panic!("Cluster ID expected"),
Expand Down
Loading

0 comments on commit d5558f5

Please sign in to comment.