diff --git a/.github/workflows/template.yml b/.github/workflows/template.yml index 2724502dfe8f..0fa17011d7dd 100644 --- a/.github/workflows/template.yml +++ b/.github/workflows/template.yml @@ -18,8 +18,8 @@ jobs: with: cancel_others: "true" - test: - name: Test + test_hydroflow: + name: Test hydroflow if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.event_name != 'pull_request' }} timeout-minutes: 10 needs: pre_job @@ -36,10 +36,87 @@ jobs: toolchain: nightly components: rustfmt, clippy - - name: Install Action cargo-generate + - name: Action cargo-generate uses: cargo-generate/cargo-generate-action@v0.17.5 with: name: generated + template: template/hydroflow + - name: Move generated project + run: | + mv generated ${{ runner.temp }}/ + + - name: Run cargo check + uses: actions-rs/cargo@v1 + with: + command: check + args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets + + - name: Run cargo fmt + uses: actions-rs/cargo@v1 + with: + command: fmt + args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all -- --check + + - name: Run cargo clippy + uses: actions-rs/cargo@v1 + with: + command: clippy + args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all --all-targets -- -D warnings + + - name: Run cargo test + uses: actions-rs/cargo@v1 + with: + command: test + args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets --no-fail-fast + + - name: Run cargo build + uses: actions-rs/cargo@v1 + with: + command: build + args: --manifest-path "${{ runner.temp }}/generated/Cargo.toml" --all-targets + + - name: test template example + run: | + ECHO_STRING='hello this is a test' + + trap 'rm client-input && rm client-output && kill $(jobs -p)' EXIT + + echo "$ECHO_STRING" >client-input + + "${{ runner.temp }}/generated/target/debug/hydroflow-template" --role server --address 127.0.0.100:2048 & + "${{ runner.temp }}/generated/target/debug/hydroflow-template" --role client --address 127.0.0.100:2048 client-output & + + sleep 1 + + if cat client-output | grep -q "$ECHO_STRING"; then + exit 0 + else + exit -1 + fi + + test_hydroflow_plus: + name: Test hydroflow_plus + if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.event_name != 'pull_request' }} + timeout-minutes: 10 + needs: pre_job + runs-on: ubuntu-latest + + steps: + - name: Checkout sources + uses: actions/checkout@v2 + + - name: Install nightly toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: nightly + components: rustfmt, clippy + + - name: Action cargo-generate + uses: cargo-generate/cargo-generate-action@v0.17.5 + with: + name: generated + template: template/hydroflow_plus - name: Move generated project run: | mv generated ${{ runner.temp }}/ diff --git a/template/hydroflow/.gitignore b/template/hydroflow/.gitignore new file mode 100644 index 000000000000..96ef6c0b944e --- /dev/null +++ b/template/hydroflow/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/template/hydroflow/Cargo.toml b/template/hydroflow/Cargo.toml new file mode 100644 index 000000000000..831a145e416d --- /dev/null +++ b/template/hydroflow/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "hydroflow-template" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { version = "4.0.29", features = [ "derive" ] } +hydroflow = { git = "https://github.com/hydro-project/hydroflow" } +serde = { version = "1", features = [ "derive" ] } +chrono = { version = "0.4.20", features = [ "serde" ], default-features = true } diff --git a/template/hydroflow/README.md b/template/hydroflow/README.md new file mode 100644 index 000000000000..6013a39bee71 --- /dev/null +++ b/template/hydroflow/README.md @@ -0,0 +1,91 @@ +## Getting Started +This is a template for a Rust project that uses [Hydroflow](http://github.com/hydro-project/hydroflow) for +distributed services. It implements a simple echo server and client over UDP. + +## Using the Template +```bash +cargo generate hydro-project/hydroflow-template +``` + +You will be prompted to name your project. Once the command completes, you can `cd` into the project and build the +template. + +```bash +cd +cargo build +``` + +## Running the Template +The server can be run in one terminal and one or more clients can be run in separate terminals. +### Server +```console +% cargo run -- --role server +``` + +### Client +You can run multiple instances of the client by running the following command: +```console +% cargo run -- --role client +``` + +## Viewing Help +```console +cargo run -- --help +``` + +## Template Project Structure +The `src` directory contains the following files: + +| File | Description | +|---------------|--------------------------------------------------------------------------------------------------------------------------------------| +| `main.rs` | Contains `main` entry-point function for both client and server. Performs command-line argument parsing. | +| `protocol.rs` | Contains the `Message` enum that defines the messages that can be sent between instances. | +| `.rs` | Contains the service for the given role. Example implementations and skeletal hydroflow spec are provided for `server` and `client`. | +| `helpers.rs` | Contains helper functions that are invoked from Hydroflow code in multiple services. | + +## Communication Patterns +No particular communication pattern is assumed by Hydroflow. The unmodified template application is designed to be used in a "star topology": +multiple independent clients talking to a single server. However, the template can be easily modified to support other topologies. +Additional examples are provided in the [hydroflow](https://github.com/hydro-project/hydroflow) repository in the `hydroflow/examples` directory. + +## Where do you go from here? +This template is intended to be a starting point for your own project. You'll undoubtedly want to change it. + +In our experience, when starting a Hydroflow project we recommend a four-step approach: + +1. **Roles**: Identify the roles that your services will play (in the `Opts` struct in `src/main.rs`) +2. **Messages**: Define the basic message types that services will send to each other (in the `Message` enum in `src/protocol.rs`). +3. **Print Received Messages**: Utilize the template logic at each service that prints out messages received. +4. **Exercise Sending Patterns**: Make sure the right messages get to the right recipients! Write simple logic to send out messages in all the message patterns you expect to see (in the `src/.rs` files). +5. **Service Programming**: Begin writing the actual logic for each service, with plenty of `inspect(|m| println!("{:?}", m))` operators +peppered throughout! + +Have fun! + +## Print a Dataflow Graph +The client and server can optionally print out a dataflow graph of their hydroflow code. + +### Mermaid +#### Server +Run the following command and view the messages received by the server on stdout. +```console +% cargo run -- --role server --graph mermaid +``` + +#### Client +Run the following command and type in the messages to send to the server. When the server responds, the echoed message +will be printed on stdout. +```console +% cargo run -- --role client --graph mermaid +``` + +### Dot +#### Server +```console +% cargo run -- --role server --graph dot +``` + +#### Client +```console +% cargo run -- --role client --graph dot +``` \ No newline at end of file diff --git a/template/hydroflow/cargo-generate.toml b/template/hydroflow/cargo-generate.toml new file mode 100644 index 000000000000..2d888be7dd48 --- /dev/null +++ b/template/hydroflow/cargo-generate.toml @@ -0,0 +1,4 @@ +[template] +ignore = [ + "Cargo.lock", +] \ No newline at end of file diff --git a/template/hydroflow/rust-toolchain.toml b/template/hydroflow/rust-toolchain.toml new file mode 100644 index 000000000000..84e8c1610335 --- /dev/null +++ b/template/hydroflow/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly-2024-06-17" diff --git a/template/hydroflow/src/client.rs b/template/hydroflow/src/client.rs new file mode 100644 index 000000000000..ab5ea2aba20e --- /dev/null +++ b/template/hydroflow/src/client.rs @@ -0,0 +1,69 @@ +use crate::helpers::print_graph; +use crate::protocol::Message; +use crate::{Opts, DEFAULT_SERVER_ADDRESS}; +use chrono::prelude::*; +use hydroflow::hydroflow_syntax; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; +use std::net::SocketAddr; + +/// Runs the client. The client is a long-running process that reads stdin, and sends messages that +/// it receives to the server. The client also prints any messages it receives to stdout. +pub(crate) async fn run_client(opts: Opts) { + // Client listens on a port picked by the OS. + let client_addr = ipv4_resolve("localhost:0").unwrap(); + + // Use the server address that was provided in the command-line arguments, or use the default + // if one was not provided. + let server_addr = opts + .address + .unwrap_or_else(|| ipv4_resolve(DEFAULT_SERVER_ADDRESS).unwrap()); + + // Bind a client-side socket to the requested address and port. The OS will allocate a port and + // the actual port used will be available in `actual_client_addr`. + // + // `outbound` is a `UdpSink`, we use it to send messages. `inbound` is `UdpStream`, we use it + // to receive messages. + // + // bind_udp_bytes is an async function, so we need to await it. + let (outbound, inbound, allocated_client_addr) = bind_udp_bytes(client_addr).await; + + println!( + "Client is live! Listening on {:?} and talking to server on {:?}", + allocated_client_addr, server_addr + ); + + // The skeletal hydroflow spec for a client. + let mut flow = hydroflow_syntax! { + + // Whenever a serialized message is received by the application from a particular address, + // a (serialized_payload, address_of_sender) pair is emitted by the `inbound` stream. + // + // `source_stream_serde` deserializes the payload into a + // (deserialized_payload, address_of_sender) pair. + inbound_chan = source_stream_serde(inbound) + -> map(Result::unwrap); // If the deserialization was unsuccessful, this line will panic. + + // Mirrors the inbound process on the outbound side. + // `dest_sink_serde` accepts a (`Message`, `SocketAddr`) pair and serializes the `Message` + // using `serde`, converting it to a (serialized_payload, address_of_receiver) pair. + // `outbound` transmits the serialized_payload to the address. + outbound_chan = dest_sink_serde(outbound); + + // Print all messages for debugging purposes. + inbound_chan + -> for_each(|(m, a): (Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a)); + + // Consume input from stdin and send to server as Message::Echo + source_stdin() // A stream of lines from stdin. + -> map(|l| (Message::Echo{ payload: l.unwrap(), ts: Utc::now(), }, server_addr) ) + -> outbound_chan; // Send it to the server + }; + + // If a graph was requested to be printed, print it. + if let Some(graph) = opts.graph { + print_graph(&flow, graph, opts.write_config); + } + + // Run the client. This is an async function, so we need to await it. + flow.run_async().await; +} diff --git a/template/hydroflow/src/helpers.rs b/template/hydroflow/src/helpers.rs new file mode 100644 index 000000000000..58264872461b --- /dev/null +++ b/template/hydroflow/src/helpers.rs @@ -0,0 +1,9 @@ +use hydroflow::lang::graph::{WriteConfig, WriteGraphType}; +use hydroflow::scheduled::graph::Hydroflow; + +pub fn print_graph(flow: &Hydroflow, graph: WriteGraphType, write_config: Option) { + let serde_graph = flow + .meta_graph() + .expect("No graph found, maybe failed to parse."); + serde_graph.open_graph(graph, write_config).unwrap(); +} diff --git a/template/hydroflow/src/main.rs b/template/hydroflow/src/main.rs new file mode 100644 index 000000000000..17328ce06bbd --- /dev/null +++ b/template/hydroflow/src/main.rs @@ -0,0 +1,72 @@ +use clap::{Parser, ValueEnum}; +use client::run_client; +use hydroflow::lang::graph::{WriteConfig, WriteGraphType}; +use hydroflow::tokio; +use hydroflow::util::ipv4_resolve; +use server::run_server; +use std::net::SocketAddr; + +mod client; +mod helpers; +mod protocol; +mod server; + +/// A simple echo server & client generated using the Hydroflow template. The lines starting with +/// `///` contain the message that appears when you run the compiled binary with the '--help' +/// arguments, so feel free to change it to whatever makes sense for your application. +/// +/// See https://docs.rs/clap/latest/clap/ for more information. +#[derive(Parser, Debug)] +struct Opts { + // The `Opts` structure contains the command line arguments accepted by the application and can + // be modified to suit your requirements. Refer to the clap crate documentation for more + // information. + /// The role this application process should assume. The example in the template provides two + /// roles: server and client. The server echoes whatever message the clients send to it. + #[clap(value_enum, long)] // value_enum => parse as enum. long => "--role" instead of "-r". + role: Role, // This is a mandatory argument. + + /// The server's network address. The server listens on this address. The client sends messages + /// to this address. + #[clap(long, value_parser = ipv4_resolve)] + // value_parser => parse "ip:port" using ipv4_resolve + address: Option, // Since this is an Option, it is an optional argument. + + /// If specified, a graph representation of the Hydroflow flow used by the program will be + /// printed to the console in the specified format. This parameter can be removed if your + /// application doesn't need this functionality. + #[clap(long)] + graph: Option, + + #[clap(flatten)] + write_config: Option, +} + +#[hydroflow::main] +/// This is the main entry-point for both `Client` and `Server`. +async fn main() { + // Parse command line arguments + let opts = Opts::parse(); + + // Run the server or the client based on the role provided in the command-line arguments. + match opts.role { + Role::Server => { + run_server(opts).await; + } + Role::Client => { + run_client(opts).await; + } + } +} + +/// A running application can assume one of these roles. The launched application process assumes +/// one of these roles, based on the `--role` parameter passed in as a command line argument. +#[derive(Clone, ValueEnum, Debug)] +enum Role { + Client, + Server, +} + +/// The default server address & port on which the server listens for incoming messages. Clients +/// send message to this address & port. +pub const DEFAULT_SERVER_ADDRESS: &str = "localhost:54321"; diff --git a/template/hydroflow/src/protocol.rs b/template/hydroflow/src/protocol.rs new file mode 100644 index 000000000000..7d3052cc05a4 --- /dev/null +++ b/template/hydroflow/src/protocol.rs @@ -0,0 +1,18 @@ +use chrono::prelude::*; +use serde::{Deserialize, Serialize}; + +/// Contains all the messages that can be exchanged between application instances. The `Serialize` +/// and `Deserialize` traits allow for serialization by the `serde` crate. +#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)] +pub enum Message { + /// Echo message contains a string payload, and a timestamp at which the message was + /// constructed. + Echo { + payload: String, + ts: DateTime, + }, + + /// Heartbeat messages carry no information other than their type. + Heartbeat, + HeartbeatAck, +} diff --git a/template/hydroflow/src/server.rs b/template/hydroflow/src/server.rs new file mode 100644 index 000000000000..7624a614af3b --- /dev/null +++ b/template/hydroflow/src/server.rs @@ -0,0 +1,83 @@ +use crate::helpers::print_graph; +use crate::protocol::Message; +use crate::DEFAULT_SERVER_ADDRESS; +use chrono::prelude::*; +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; +use std::net::SocketAddr; + +/// Runs the server. The server is a long-running process that listens for messages and echoes +/// them back the client. +pub(crate) async fn run_server(opts: crate::Opts) { + // If a server address & port are provided as command-line inputs, use those, else use the + // default. + let server_address = opts + .address + .unwrap_or_else(|| ipv4_resolve(DEFAULT_SERVER_ADDRESS).unwrap()); + + println!("Starting server on {:?}", server_address); + + // Bind a server-side socket to requested address and port. If "0" was provided as the port, the + // OS will allocate a port and the actual port used will be available in `actual_server_addr`. + // + // `outbound` is a `UdpSink`, we use it to send messages. `inbound` is `UdpStream`, we use it + // to receive messages. + // + // This is an async function, so we need to await it. + let (outbound, inbound, actual_server_addr) = bind_udp_bytes(server_address).await; + + println!("Server is live! Listening on {:?}", actual_server_addr); + + // The skeletal hydroflow spec for a server. + let mut flow: Hydroflow = hydroflow_syntax! { + + // Whenever a serialized message is received by the application from a particular address, + // a (serialized_payload, address_of_sender) pair is emitted by the `inbound` stream. + // + // `source_stream_serde` deserializes the payload into a + // (deserialized_payload, address_of_sender) pair. + inbound_chan = source_stream_serde(inbound) // `source_stream_serde` deserializes the payload + -> map(Result::unwrap); // If the deserialization was unsuccessful, this line will panic. + + // Mirrors the inbound process on the outbound side. + // `dest_sink_serde` accepts a (`Message`, `SocketAddr`) pair and serializes the `Message` + // using `serde`, converting it to a (serialized_payload, address_of_receiver) pair. + // `outbound` transmits the serialized_payload to the address. + outbound_chan = union() -> dest_sink_serde(outbound); + + // Demux and destructure the inbound messages into separate streams + inbound_demuxed = inbound_chan + -> inspect(|(m, a): &(Message, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a)) // For debugging purposes. + -> demux(|(msg, addr), var_args!(echo, heartbeat, errs)| + match msg { + Message::Echo {payload, ..} => echo.give((payload, addr)), + Message::Heartbeat => heartbeat.give(addr), + _ => errs.give((msg, addr)), + } + ); + + // Echo a response back to the sender of the echo request. + inbound_demuxed[echo] + -> map(|(payload, sender_addr)| (Message::Echo { payload, ts: Utc::now() }, sender_addr) ) + -> [0]outbound_chan; + + // Respond to Heartbeat messages + inbound_demuxed[heartbeat] + -> map(|addr| (Message::HeartbeatAck, addr)) + -> [1]outbound_chan; + + // Print unexpected messages + inbound_demuxed[errs] + -> for_each(|(msg, addr)| println!("Received unexpected message type: {:?} from {:?}", msg, addr)); + + }; + + // If a graph was requested to be printed, print it. + if let Some(graph) = opts.graph { + print_graph(&flow, graph, opts.write_config); + } + + // Run the server. This is an async function, so we need to await it. + flow.run_async().await; +}