Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge hydro-project/hydroflow-template into this repo, for #1314 #1316

Merged
merged 20 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 80 additions & 3 deletions .github/workflows/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ jobs:
with:
cancel_others: "true"

test:
name: Test
test_hydroflow:
name: Test hydroflow
if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.event_name != 'pull_request' }}
timeout-minutes: 10
needs: pre_job
Expand All @@ -36,10 +36,87 @@ jobs:
toolchain: nightly
components: rustfmt, clippy

- name: Install Action cargo-generate
- name: Action cargo-generate
uses: cargo-generate/[email protected]
with:
name: generated
template: template/hydroflow
- name: Move generated project
run: |
mv generated ${{ runner.temp }}/

- name: Run cargo check
uses: actions-rs/cargo@v1
with:
command: check
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets

- name: Run cargo fmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all -- --check

- name: Run cargo clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all --all-targets -- -D warnings

- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets --no-fail-fast

- name: Run cargo build
uses: actions-rs/cargo@v1
with:
command: build
args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets

- name: test template example
run: |
ECHO_STRING='hello this is a test'

trap 'rm client-input && rm client-output && kill $(jobs -p)' EXIT

echo "$ECHO_STRING" >client-input

"${{ runner.temp }}/generated/target/debug/hydroflow-template" --role server --address 127.0.0.100:2048 &
"${{ runner.temp }}/generated/target/debug/hydroflow-template" --role client --address 127.0.0.100:2048 <client-input >client-output &

sleep 1

if cat client-output | grep -q "$ECHO_STRING"; then
exit 0
else
exit -1
fi

test_hydroflow_plus:
name: Test hydroflow_plus
if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.event_name != 'pull_request' }}
timeout-minutes: 10
needs: pre_job
runs-on: ubuntu-latest

steps:
- name: Checkout sources
uses: actions/checkout@v2

- name: Install nightly toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly
components: rustfmt, clippy

- name: Action cargo-generate
uses: cargo-generate/[email protected]
with:
name: generated
template: template/hydroflow_plus
- name: Move generated project
run: |
mv generated ${{ runner.temp }}/
Expand Down
2 changes: 2 additions & 0 deletions template/hydroflow/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
Cargo.lock
12 changes: 12 additions & 0 deletions template/hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "hydroflow-template"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { version = "4.0.29", features = [ "derive" ] }
hydroflow = { git = "https://github.com/hydro-project/hydroflow" }
serde = { version = "1", features = [ "derive" ] }
chrono = { version = "0.4.20", features = [ "serde" ], default-features = true }
91 changes: 91 additions & 0 deletions template/hydroflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
## Getting Started
This is a template for a Rust project that uses [Hydroflow](http://github.com/hydro-project/hydroflow) for
distributed services. It implements a simple echo server and client over UDP.

## Using the Template
```bash
cargo generate hydro-project/hydroflow-template
```

You will be prompted to name your project. Once the command completes, you can `cd` into the project and build the
template.

```bash
cd <myproject>
cargo build
```

## Running the Template
The server can be run in one terminal and one or more clients can be run in separate terminals.
### Server
```console
% cargo run -- --role server
```

### Client
You can run multiple instances of the client by running the following command:
```console
% cargo run -- --role client
```

## Viewing Help
```console
cargo run -- --help
```

## Template Project Structure
The `src` directory contains the following files:

| File | Description |
|---------------|--------------------------------------------------------------------------------------------------------------------------------------|
| `main.rs` | Contains `main` entry-point function for both client and server. Performs command-line argument parsing. |
| `protocol.rs` | Contains the `Message` enum that defines the messages that can be sent between instances. |
| `<role>.rs` | Contains the service for the given role. Example implementations and skeletal hydroflow spec are provided for `server` and `client`. |
| `helpers.rs` | Contains helper functions that are invoked from Hydroflow code in multiple services. |

## Communication Patterns
No particular communication pattern is assumed by Hydroflow. The unmodified template application is designed to be used in a "star topology":
multiple independent clients talking to a single server. However, the template can be easily modified to support other topologies.
Additional examples are provided in the [hydroflow](https://github.com/hydro-project/hydroflow) repository in the `hydroflow/examples` directory.

## Where do you go from here?
This template is intended to be a starting point for your own project. You'll undoubtedly want to change it.

In our experience, when starting a Hydroflow project we recommend a four-step approach:

1. **Roles**: Identify the roles that your services will play (in the `Opts` struct in `src/main.rs`)
2. **Messages**: Define the basic message types that services will send to each other (in the `Message` enum in `src/protocol.rs`).
3. **Print Received Messages**: Utilize the template logic at each service that prints out messages received.
4. **Exercise Sending Patterns**: Make sure the right messages get to the right recipients! Write simple logic to send out messages in all the message patterns you expect to see (in the `src/<role>.rs` files).
5. **Service Programming**: Begin writing the actual logic for each service, with plenty of `inspect(|m| println!("{:?}", m))` operators
peppered throughout!

Have fun!

## Print a Dataflow Graph
The client and server can optionally print out a dataflow graph of their hydroflow code.

### Mermaid
#### Server
Run the following command and view the messages received by the server on stdout.
```console
% cargo run -- --role server --graph mermaid
```

#### Client
Run the following command and type in the messages to send to the server. When the server responds, the echoed message
will be printed on stdout.
```console
% cargo run -- --role client --graph mermaid
```

### Dot
#### Server
```console
% cargo run -- --role server --graph dot
```

#### Client
```console
% cargo run -- --role client --graph dot
```
4 changes: 4 additions & 0 deletions template/hydroflow/cargo-generate.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[template]
ignore = [
"Cargo.lock",
]
2 changes: 2 additions & 0 deletions template/hydroflow/rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly-2024-06-17"
69 changes: 69 additions & 0 deletions template/hydroflow/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::helpers::print_graph;
use crate::protocol::Message;
use crate::{Opts, DEFAULT_SERVER_ADDRESS};
use chrono::prelude::*;
use hydroflow::hydroflow_syntax;
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
use std::net::SocketAddr;

/// Runs the client. The client is a long-running process that reads stdin, and sends messages that
/// it receives to the server. The client also prints any messages it receives to stdout.
pub(crate) async fn run_client(opts: Opts) {
// Client listens on a port picked by the OS.
let client_addr = ipv4_resolve("localhost:0").unwrap();

// Use the server address that was provided in the command-line arguments, or use the default
// if one was not provided.
let server_addr = opts
.address
.unwrap_or_else(|| ipv4_resolve(DEFAULT_SERVER_ADDRESS).unwrap());

// Bind a client-side socket to the requested address and port. The OS will allocate a port and
// the actual port used will be available in `actual_client_addr`.
//
// `outbound` is a `UdpSink`, we use it to send messages. `inbound` is `UdpStream`, we use it
// to receive messages.
//
// bind_udp_bytes is an async function, so we need to await it.
let (outbound, inbound, allocated_client_addr) = bind_udp_bytes(client_addr).await;

println!(
"Client is live! Listening on {:?} and talking to server on {:?}",
allocated_client_addr, server_addr
);

// The skeletal hydroflow spec for a client.
let mut flow = hydroflow_syntax! {

// Whenever a serialized message is received by the application from a particular address,
// a (serialized_payload, address_of_sender) pair is emitted by the `inbound` stream.
//
// `source_stream_serde` deserializes the payload into a
// (deserialized_payload, address_of_sender) pair.
inbound_chan = source_stream_serde(inbound)
-> map(Result::unwrap); // If the deserialization was unsuccessful, this line will panic.

// Mirrors the inbound process on the outbound side.
// `dest_sink_serde` accepts a (`Message`, `SocketAddr`) pair and serializes the `Message`
// using `serde`, converting it to a (serialized_payload, address_of_receiver) pair.
// `outbound` transmits the serialized_payload to the address.
outbound_chan = dest_sink_serde(outbound);

// Print all messages for debugging purposes.
inbound_chan
-> for_each(|(m, a): (Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a));

// Consume input from stdin and send to server as Message::Echo
source_stdin() // A stream of lines from stdin.
-> map(|l| (Message::Echo{ payload: l.unwrap(), ts: Utc::now(), }, server_addr) )
-> outbound_chan; // Send it to the server
};

// If a graph was requested to be printed, print it.
if let Some(graph) = opts.graph {
print_graph(&flow, graph, opts.write_config);
}

// Run the client. This is an async function, so we need to await it.
flow.run_async().await;
}
9 changes: 9 additions & 0 deletions template/hydroflow/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use hydroflow::lang::graph::{WriteConfig, WriteGraphType};
use hydroflow::scheduled::graph::Hydroflow;

pub fn print_graph(flow: &Hydroflow, graph: WriteGraphType, write_config: Option<WriteConfig>) {
let serde_graph = flow
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, write_config).unwrap();
}
72 changes: 72 additions & 0 deletions template/hydroflow/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use clap::{Parser, ValueEnum};
use client::run_client;
use hydroflow::lang::graph::{WriteConfig, WriteGraphType};
use hydroflow::tokio;
use hydroflow::util::ipv4_resolve;
use server::run_server;
use std::net::SocketAddr;

mod client;
mod helpers;
mod protocol;
mod server;

/// A simple echo server & client generated using the Hydroflow template. The lines starting with
/// `///` contain the message that appears when you run the compiled binary with the '--help'
/// arguments, so feel free to change it to whatever makes sense for your application.
///
/// See https://docs.rs/clap/latest/clap/ for more information.
#[derive(Parser, Debug)]
struct Opts {
// The `Opts` structure contains the command line arguments accepted by the application and can
// be modified to suit your requirements. Refer to the clap crate documentation for more
// information.
/// The role this application process should assume. The example in the template provides two
/// roles: server and client. The server echoes whatever message the clients send to it.
#[clap(value_enum, long)] // value_enum => parse as enum. long => "--role" instead of "-r".
role: Role, // This is a mandatory argument.

/// The server's network address. The server listens on this address. The client sends messages
/// to this address.
#[clap(long, value_parser = ipv4_resolve)]
// value_parser => parse "ip:port" using ipv4_resolve
address: Option<SocketAddr>, // Since this is an Option<T>, it is an optional argument.

/// If specified, a graph representation of the Hydroflow flow used by the program will be
/// printed to the console in the specified format. This parameter can be removed if your
/// application doesn't need this functionality.
#[clap(long)]
graph: Option<WriteGraphType>,

#[clap(flatten)]
write_config: Option<WriteConfig>,
}

#[hydroflow::main]
/// This is the main entry-point for both `Client` and `Server`.
async fn main() {
// Parse command line arguments
let opts = Opts::parse();

// Run the server or the client based on the role provided in the command-line arguments.
match opts.role {
Role::Server => {
run_server(opts).await;
}
Role::Client => {
run_client(opts).await;
}
}
}

/// A running application can assume one of these roles. The launched application process assumes
/// one of these roles, based on the `--role` parameter passed in as a command line argument.
#[derive(Clone, ValueEnum, Debug)]
enum Role {
Client,
Server,
}

/// The default server address & port on which the server listens for incoming messages. Clients
/// send message to this address & port.
pub const DEFAULT_SERVER_ADDRESS: &str = "localhost:54321";
18 changes: 18 additions & 0 deletions template/hydroflow/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use chrono::prelude::*;
use serde::{Deserialize, Serialize};

/// Contains all the messages that can be exchanged between application instances. The `Serialize`
/// and `Deserialize` traits allow for serialization by the `serde` crate.
#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
pub enum Message {
/// Echo message contains a string payload, and a timestamp at which the message was
/// constructed.
Echo {
payload: String,
ts: DateTime<Utc>,
},

/// Heartbeat messages carry no information other than their type.
Heartbeat,
HeartbeatAck,
}
Loading
Loading