Skip to content

Commit

Permalink
Merge remote-tracking branch 'ht/move-hf' into ht for #1314 (#1316)
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Jun 24, 2024
2 parents c1a70f8 + a75ace3 commit 2270ea9
Show file tree
Hide file tree
Showing 11 changed files with 442 additions and 3 deletions.
83 changes: 80 additions & 3 deletions .github/workflows/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ jobs:
with:
cancel_others: "true"

test:
name: Test
test_hydroflow:
name: Test hydroflow
if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.event_name != 'pull_request' }}
timeout-minutes: 10
needs: pre_job
Expand All @@ -36,10 +36,87 @@ jobs:
toolchain: nightly
components: rustfmt, clippy

- name: Install Action cargo-generate
- name: Action cargo-generate
uses: cargo-generate/[email protected]
with:
name: generated
template: template/hydroflow
- name: Move generated project
run: |
mv generated ${{ runner.temp }}/
- name: Run cargo check
uses: actions-rs/cargo@v1
with:
command: check
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets

- name: Run cargo fmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all -- --check

- name: Run cargo clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all --all-targets -- -D warnings

- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets --no-fail-fast

- name: Run cargo build
uses: actions-rs/cargo@v1
with:
command: build
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets

- name: test template example
run: |
ECHO_STRING='hello this is a test'
trap 'rm client-input && rm client-output && kill $(jobs -p)' EXIT
echo "$ECHO_STRING" >client-input
"${{ runner.temp }}/generated/target/debug/hydroflow-template" --role server --address 127.0.0.100:2048 &
"${{ runner.temp }}/generated/target/debug/hydroflow-template" --role client --address 127.0.0.100:2048 <client-input >client-output &
sleep 1
if cat client-output | grep -q "$ECHO_STRING"; then
exit 0
else
exit -1
fi
test_hydroflow_plus:
name: Test hydroflow_plus
if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.event_name != 'pull_request' }}
timeout-minutes: 10
needs: pre_job
runs-on: ubuntu-latest

steps:
- name: Checkout sources
uses: actions/checkout@v2

- name: Install nightly toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly
components: rustfmt, clippy

- name: Action cargo-generate
uses: cargo-generate/[email protected]
with:
name: generated
template: template/hydroflow_plus
- name: Move generated project
run: |
mv generated ${{ runner.temp }}/
Expand Down
2 changes: 2 additions & 0 deletions template/hydroflow/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
Cargo.lock
12 changes: 12 additions & 0 deletions template/hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "hydroflow-template"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { version = "4.0.29", features = [ "derive" ] }
hydroflow = { git = "https://github.com/hydro-project/hydroflow" }
serde = { version = "1", features = [ "derive" ] }
chrono = { version = "0.4.20", features = [ "serde" ], default-features = true }
91 changes: 91 additions & 0 deletions template/hydroflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
## Getting Started
This is a template for a Rust project that uses [Hydroflow](http://github.com/hydro-project/hydroflow) for
distributed services. It implements a simple echo server and client over UDP.

## Using the Template
```bash
cargo generate hydro-project/hydroflow-template
```

You will be prompted to name your project. Once the command completes, you can `cd` into the project and build the
template.

```bash
cd <myproject>
cargo build
```

## Running the Template
The server can be run in one terminal and one or more clients can be run in separate terminals.
### Server
```console
% cargo run -- --role server
```

### Client
You can run multiple instances of the client by running the following command:
```console
% cargo run -- --role client
```

## Viewing Help
```console
cargo run -- --help
```

## Template Project Structure
The `src` directory contains the following files:

| File | Description |
|---------------|--------------------------------------------------------------------------------------------------------------------------------------|
| `main.rs` | Contains `main` entry-point function for both client and server. Performs command-line argument parsing. |
| `protocol.rs` | Contains the `Message` enum that defines the messages that can be sent between instances. |
| `<role>.rs` | Contains the service for the given role. Example implementations and skeletal hydroflow spec are provided for `server` and `client`. |
| `helpers.rs` | Contains helper functions that are invoked from Hydroflow code in multiple services. |

## Communication Patterns
No particular communication pattern is assumed by Hydroflow. The unmodified template application is designed to be used in a "star topology":
multiple independent clients talking to a single server. However, the template can be easily modified to support other topologies.
Additional examples are provided in the [hydroflow](https://github.com/hydro-project/hydroflow) repository in the `hydroflow/examples` directory.

## Where do you go from here?
This template is intended to be a starting point for your own project. You'll undoubtedly want to change it.

In our experience, when starting a Hydroflow project we recommend a four-step approach:

1. **Roles**: Identify the roles that your services will play (in the `Opts` struct in `src/main.rs`)
2. **Messages**: Define the basic message types that services will send to each other (in the `Message` enum in `src/protocol.rs`).
3. **Print Received Messages**: Utilize the template logic at each service that prints out messages received.
4. **Exercise Sending Patterns**: Make sure the right messages get to the right recipients! Write simple logic to send out messages in all the message patterns you expect to see (in the `src/<role>.rs` files).
5. **Service Programming**: Begin writing the actual logic for each service, with plenty of `inspect(|m| println!("{:?}", m))` operators
peppered throughout!

Have fun!

## Print a Dataflow Graph
The client and server can optionally print out a dataflow graph of their hydroflow code.

### Mermaid
#### Server
Run the following command and view the messages received by the server on stdout.
```console
% cargo run -- --role server --graph mermaid
```

#### Client
Run the following command and type in the messages to send to the server. When the server responds, the echoed message
will be printed on stdout.
```console
% cargo run -- --role client --graph mermaid
```

### Dot
#### Server
```console
% cargo run -- --role server --graph dot
```

#### Client
```console
% cargo run -- --role client --graph dot
```
4 changes: 4 additions & 0 deletions template/hydroflow/cargo-generate.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[template]
ignore = [
"Cargo.lock",
]
2 changes: 2 additions & 0 deletions template/hydroflow/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly-2024-06-17"
69 changes: 69 additions & 0 deletions template/hydroflow/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::helpers::print_graph;
use crate::protocol::Message;
use crate::{Opts, DEFAULT_SERVER_ADDRESS};
use chrono::prelude::*;
use hydroflow::hydroflow_syntax;
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
use std::net::SocketAddr;

/// Runs the client. The client is a long-running process that reads stdin, and sends messages that
/// it receives to the server. The client also prints any messages it receives to stdout.
pub(crate) async fn run_client(opts: Opts) {
// Client listens on a port picked by the OS.
let client_addr = ipv4_resolve("localhost:0").unwrap();

// Use the server address that was provided in the command-line arguments, or use the default
// if one was not provided.
let server_addr = opts
.address
.unwrap_or_else(|| ipv4_resolve(DEFAULT_SERVER_ADDRESS).unwrap());

// Bind a client-side socket to the requested address and port. The OS will allocate a port and
// the actual port used will be available in `actual_client_addr`.
//
// `outbound` is a `UdpSink`, we use it to send messages. `inbound` is `UdpStream`, we use it
// to receive messages.
//
// bind_udp_bytes is an async function, so we need to await it.
let (outbound, inbound, allocated_client_addr) = bind_udp_bytes(client_addr).await;

println!(
"Client is live! Listening on {:?} and talking to server on {:?}",
allocated_client_addr, server_addr
);

// The skeletal hydroflow spec for a client.
let mut flow = hydroflow_syntax! {

// Whenever a serialized message is received by the application from a particular address,
// a (serialized_payload, address_of_sender) pair is emitted by the `inbound` stream.
//
// `source_stream_serde` deserializes the payload into a
// (deserialized_payload, address_of_sender) pair.
inbound_chan = source_stream_serde(inbound)
-> map(Result::unwrap); // If the deserialization was unsuccessful, this line will panic.

// Mirrors the inbound process on the outbound side.
// `dest_sink_serde` accepts a (`Message`, `SocketAddr`) pair and serializes the `Message`
// using `serde`, converting it to a (serialized_payload, address_of_receiver) pair.
// `outbound` transmits the serialized_payload to the address.
outbound_chan = dest_sink_serde(outbound);

// Print all messages for debugging purposes.
inbound_chan
-> for_each(|(m, a): (Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a));

// Consume input from stdin and send to server as Message::Echo
source_stdin() // A stream of lines from stdin.
-> map(|l| (Message::Echo{ payload: l.unwrap(), ts: Utc::now(), }, server_addr) )
-> outbound_chan; // Send it to the server
};

// If a graph was requested to be printed, print it.
if let Some(graph) = opts.graph {
print_graph(&flow, graph, opts.write_config);
}

// Run the client. This is an async function, so we need to await it.
flow.run_async().await;
}
9 changes: 9 additions & 0 deletions template/hydroflow/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use hydroflow::lang::graph::{WriteConfig, WriteGraphType};
use hydroflow::scheduled::graph::Hydroflow;

pub fn print_graph(flow: &Hydroflow, graph: WriteGraphType, write_config: Option<WriteConfig>) {
let serde_graph = flow
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, write_config).unwrap();
}
72 changes: 72 additions & 0 deletions template/hydroflow/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use clap::{Parser, ValueEnum};
use client::run_client;
use hydroflow::lang::graph::{WriteConfig, WriteGraphType};
use hydroflow::tokio;
use hydroflow::util::ipv4_resolve;
use server::run_server;
use std::net::SocketAddr;

mod client;
mod helpers;
mod protocol;
mod server;

/// A simple echo server & client generated using the Hydroflow template. The lines starting with
/// `///` contain the message that appears when you run the compiled binary with the '--help'
/// arguments, so feel free to change it to whatever makes sense for your application.
///
/// See https://docs.rs/clap/latest/clap/ for more information.
#[derive(Parser, Debug)]
struct Opts {
// The `Opts` structure contains the command line arguments accepted by the application and can
// be modified to suit your requirements. Refer to the clap crate documentation for more
// information.
/// The role this application process should assume. The example in the template provides two
/// roles: server and client. The server echoes whatever message the clients send to it.
#[clap(value_enum, long)] // value_enum => parse as enum. long => "--role" instead of "-r".
role: Role, // This is a mandatory argument.

/// The server's network address. The server listens on this address. The client sends messages
/// to this address.
#[clap(long, value_parser = ipv4_resolve)]
// value_parser => parse "ip:port" using ipv4_resolve
address: Option<SocketAddr>, // Since this is an Option<T>, it is an optional argument.

/// If specified, a graph representation of the Hydroflow flow used by the program will be
/// printed to the console in the specified format. This parameter can be removed if your
/// application doesn't need this functionality.
#[clap(long)]
graph: Option<WriteGraphType>,

#[clap(flatten)]
write_config: Option<WriteConfig>,
}

#[hydroflow::main]
/// This is the main entry-point for both `Client` and `Server`.
async fn main() {
// Parse command line arguments
let opts = Opts::parse();

// Run the server or the client based on the role provided in the command-line arguments.
match opts.role {
Role::Server => {
run_server(opts).await;
}
Role::Client => {
run_client(opts).await;
}
}
}

/// A running application can assume one of these roles. The launched application process assumes
/// one of these roles, based on the `--role` parameter passed in as a command line argument.
#[derive(Clone, ValueEnum, Debug)]
enum Role {
Client,
Server,
}

/// The default server address & port on which the server listens for incoming messages. Clients
/// send message to this address & port.
pub const DEFAULT_SERVER_ADDRESS: &str = "localhost:54321";
18 changes: 18 additions & 0 deletions template/hydroflow/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use chrono::prelude::*;
use serde::{Deserialize, Serialize};

/// Contains all the messages that can be exchanged between application instances. The `Serialize`
/// and `Deserialize` traits allow for serialization by the `serde` crate.
#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
pub enum Message {
/// Echo message contains a string payload, and a timestamp at which the message was
/// constructed.
Echo {
payload: String,
ts: DateTime<Utc>,
},

/// Heartbeat messages carry no information other than their type.
Heartbeat,
HeartbeatAck,
}
Loading

0 comments on commit 2270ea9

Please sign in to comment.