Skip to content

Commit

Permalink
docs(hydroflow_plus): update docs for hydroflow plus, pull code from …
Browse files Browse the repository at this point in the history
…template for first two examples #1012 (#1318)

Also fix a bug in `docs/src/util.ts:getLines()` that would cut off the beginning of lines
  • Loading branch information
MingweiSamuel authored Jun 25, 2024
1 parent 2270ea9 commit 8eb8aa1
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 125 deletions.
99 changes: 17 additions & 82 deletions docs/docs/hydroflow_plus/quickstart/distributed.mdx
Original file line number Diff line number Diff line change
@@ -1,114 +1,49 @@
---
sidebar_position: 2
---
import CodeBlock from '@theme/CodeBlock';
import firstTenSrc from '!!raw-loader!../../../../template/hydroflow_plus/flow/src/first_ten.rs';
import firstTenDistSrc from '!!raw-loader!../../../../template/hydroflow_plus/flow/src/first_ten_distributed.rs';
import firstTenDistBin from '!!raw-loader!../../../../template/hydroflow_plus/flow/src/bin/first_ten_distributed.rs';
import firstTenDistExample from '!!raw-loader!../../../../template/hydroflow_plus/flow/examples/first_ten_distributed.rs';
import { getLines, extractOutput } from '../../../src/util';

# Adding Distribution
Continuing from our previous example, we will now look at how to extend our program to run on multiple processes. Recall that our previous flow graph looked like this:

```rust title="flow/src/first_ten.rs"
use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);
let numbers = flow.source_iter(&process, q!(0..10));
numbers.for_each(q!(|n| println!("{}", n)));
}
```
<CodeBlock language="rust" title="flow/src/first_ten.rs">{getLines(firstTenSrc, 3, 14)}</CodeBlock>

## The Flow Graph
Let's extend this example to print the numbers on a separate process. First, we need to specify that our flow graph will involve the network. We do this by replacing the `LocalDeploy<'a>` trait bound with the general `Deploy<'a>`. Then, we can use the `process_spec` to create a second process:
```rust title="flow/src/first_ten_distributed.rs"
use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten_distributed<'a, D: Deploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);
let second_process = flow.process(process_spec);
}
```

<CodeBlock language="rust" title="flow/src/first_ten_distributed.rs">{getLines(firstTenDistSrc, 1, 9)}</CodeBlock>

Now, we can distribute our dataflow by using the `send_bincode` operator to mark where the data should be sent using bincode serialization.

```rust
let numbers = flow.source_iter(&process, q!(0..10));
numbers
.send_bincode(&second_process)
.for_each(q!(|n| println!("{}", n)));
```
<CodeBlock language="rust">{getLines(firstTenDistSrc, 11, 14)}</CodeBlock>

## The Runtime
Now that our graph spans multiple processes, our runtime entrypoint will involve multiple subgraphs. This means we can't get away with just the optimized dataflow. Instead, we must take the subgraph ID as a runtime parameter through `with_dynamic_id` to select the appropriate graph. In addition, our dataflow involves the network, so we take a `HydroCLI` runtime parameter (`cli`) so that processes can look up their network connections and instantiate the flow graph with access to it.

```rust title="flow/src/first_ten_distributed.rs"
use hydroflow_plus::util::cli::HydroCLI;
use hydroflow_plus_cli_integration::{CLIRuntime, HydroflowPlusMeta};

#[stageleft::entry]
pub fn first_ten_distributed_runtime<'a>(
flow: FlowBuilder<'a, CLIRuntime>,
cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten_distributed(&flow, &cli);
flow.extract()
.optimize_default()
.with_dynamic_id(q!(cli.meta.subgraph_id))
}
```
In a new file:

<CodeBlock language="rust" title="flow/src/first_ten_distributed.rs">{getLines(firstTenDistSrc, 19, 31)}</CodeBlock>

The corresponding binary in `src/bin/first_ten_distributed.rs` then instantiates the CLI and reads the process ID from the command line arguments:

```rust title="flow/src/bin/first_ten_distributed.rs"
#[tokio::main]
async fn main() {
hydroflow_plus::util::cli::launch!(
|ports| flow::first_ten_distributed_runtime!(ports)
).await;
}
```
<CodeBlock language="rust" title="flow/src/bin/first_ten_distributed.rs">{firstTenDistBin}</CodeBlock>

## The Deployment
Finally, we need to deploy our dataflow with the appropriate network topology. We achieve this by using [Hydro Deploy](../../deploy/index.md). Hydroflow+ integrates with Hydro Deploy to automatically construct the topology based on the flow graph. We can create a new file `examples/first_ten_distributed.rs` with the following contents:

```rust title="flow/examples/first_ten_distributed.rs"
use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_cli_integration::DeployProcessSpec;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
flow::first_ten::first_ten_distributed(
&builder,
&DeployProcessSpec::new(|| {
deployment.add_service(
HydroflowCrate::new(".", localhost.clone())
.bin("first_ten_distributed")
.profile("dev"),
)
}),
);

deployment.deploy().await.unwrap();

deployment.start().await.unwrap();

tokio::signal::ctrl_c().await.unwrap()
}
```
<CodeBlock language="rust" title="flow/src/examples/first_ten_distributed.rs">{firstTenDistExample}</CodeBlock>

Most importantly, we specify a `DeployProcessSpec`, which takes a closure that constructs a Hydro Deploy service for each process in the flow graph. In our case, we use the `HydroflowCrate` service type, which deploys a Hydroflow+ binary. We also specify the process ID as a command line argument, which is read by our runtime binary.

We can then run our distributed dataflow with:

<>{/* TODO(mingwei): grab this output from a tested snapshot file */}</>

```bash
#shell-command-next-line
cargo run -p flow --example first_ten_distributed
Expand Down
52 changes: 13 additions & 39 deletions docs/docs/hydroflow_plus/quickstart/structure.mdx
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
---
sidebar_position: 1
---
import CodeBlock from '@theme/CodeBlock';
import firstTenSrc from '!!raw-loader!../../../../template/hydroflow_plus/flow/src/first_ten.rs';
import firstTenBin from '!!raw-loader!../../../../template/hydroflow_plus/flow/src/bin/first_ten.rs';
import { getLines, extractOutput } from '../../../src/util';

# Your First Dataflow
Hydroflow+ programs require special structure to support code generation and distributed deployments. There are three main components of a Hydroflow+ program:
Expand All @@ -26,40 +30,22 @@ Let's look a minimal example of a Hydroflow+ program. We'll start with a simple


## The Flow Graph
```rust title="flow/src/first_ten.rs"
use hydroflow_plus::*;
use stageleft::*;

pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {}
```

<CodeBlock language="rust" title="flow/src/first_ten.rs">{getLines(firstTenSrc, 3, 9)}</CodeBlock>

To build a Hydroflow+ application, we need to define a dataflow that spans multiple processes. The `FlowBuilder` parameter captures the global dataflow, while the `process_spec` variable defines how to construct the processes where the dataflow will be executed. For now, we will only use the `ProcessSpec` once, to add a single process to our global dataflow.

```rust
pub fn first_ten<'a, D: LocalDeploy<'a>>(
flow: &FlowBuilder<'a, D>,
process_spec: &impl ProcessSpec<'a, D>
) {
let process = flow.process(process_spec);
}
```
<CodeBlock language="rust">{getLines(firstTenSrc, 10)}</CodeBlock>

Now, we can build out the dataflow to run on this process. Every dataflow starts at a source that is bound to a specific process. First, we instantiate a stream that emits the first 10 natural numbers.

```rust
let numbers = flow.source_iter(&process, q!(0..10)); // : Stream<_, i32, _, _>
```
<CodeBlock language="rust">{getLines(firstTenSrc, 12)}</CodeBlock>

In Hydroflow+, whenever there are snippets of Rust code passed to operators (like `source_iter`, `map`, or `for_each`), we use the `q!` macro to mark them. For example, we may use Rust snippets to define static sources of data or closures that transform them.

To print out these numbers, we can use the `for_each` operator (note that the body of `for_each` is a closure wrapped in `q!`):

```rust
numbers.for_each(q!(|n| println!("{}", n)));
```
<CodeBlock language="rust">{getLines(firstTenSrc, 13)}</CodeBlock>

## The Runtime
Next, we need to instantiate our dataflow into a runnable Rust binary. We do this by defining a [Stageleft entrypoint](../stageleft.mdx) for the graph, and then invoking the entrypoint inside a separate Rust binary.
Expand All @@ -68,29 +54,17 @@ To define the entrypoint, we use the `#[stageleft::entry]` macro, which takes th

Having done that, we can use some simple defaults for "distributing" this single-process deployment. First, we use `()` as an argument to `first_ten` to choose the default process spec. Then we use the `optimize_default` method to generate the Hydroflow program with default optimizations.


```rust title="flow/src/first_ten.rs"
#[stageleft::entry]
pub fn first_ten_runtime<'a>(
flow: FlowBuilder<'a, SingleProcessGraph>
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(&flow, &() /* for a single process graph */);
flow.extract().optimize_default() // : impl Quoted<'a, Hydroflow<'a>>
}
```
<CodeBlock language="rust" title="flow/src/first_ten.rs">{getLines(firstTenSrc, 16, 22)}</CodeBlock>

Finally, it's time to write our `main` function.
Stageleft entries are usable as macros from other programs. In our case, we will instantiate our entrypoint from the Rust binary for our dataflow. We can create a new file `src/bin/first_ten.rs` with the following contents. Note that Hydroflow+ requires that we use `tokio` and its `async` function specification:

```rust title="flow/src/bin/first_ten.rs"
#[tokio::main]
async fn main() {
flow::first_ten::first_ten_runtime!().run_async().await;
}
```
<CodeBlock language="rust" title="flow/src/bin/first_ten.rs">{firstTenBin}</CodeBlock>

We can now run this binary to see the output of our dataflow:

<>{/* TODO(mingwei): grab this output from a tested snapshot file */}</>

```bash
#shell-command-next-line
cargo run -p flow --bin first_ten
Expand Down
2 changes: 1 addition & 1 deletion docs/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/// Lines are one-indexed (start with `1`). Both `lineStart` and `lineEnd` are inclusive.
export function getLines(str: string, lineStart: number, lineEnd?: number): string {
let lines = str.split('\n').slice(lineStart - 1, lineEnd || lineStart);
const leadingWhitespace = Math.min(...lines.map(line => line.search(/\S/)).map(Number).filter(n => 0 !== n));
const leadingWhitespace = Math.min(...lines.filter(line => 0 !== line.length).map(line => line.search(/\S/)).map(Number));
if (0 < leadingWhitespace) {
lines = lines.map(line => line.slice(leadingWhitespace));
}
Expand Down
6 changes: 3 additions & 3 deletions template/hydroflow_plus/flow/src/first_ten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ pub fn first_ten<'a, D: LocalDeploy<'a>>(
) {
let process = flow.process(process_spec);

let numbers = flow.source_iter(&process, q!(0..10));
let numbers = flow.source_iter(&process, q!(0..10)); // : Stream<_, i32, _, _>
numbers.for_each(q!(|n| println!("{}", n)));
}

#[stageleft::entry]
pub fn first_ten_runtime<'a>(
flow: FlowBuilder<'a, SingleProcessGraph>,
) -> impl Quoted<'a, Hydroflow<'a>> {
first_ten(&flow, &());
flow.extract().optimize_default()
first_ten(&flow, &()); // &() for a single process graph.
flow.extract().optimize_default() // : impl Quoted<'a, Hydroflow<'a>>
}

0 comments on commit 8eb8aa1

Please sign in to comment.