diff --git a/.idea/hydroflow.iml b/.idea/hydroflow.iml index a2889655274f..03944299ddf0 100644 --- a/.idea/hydroflow.iml +++ b/.idea/hydroflow.iml @@ -38,6 +38,7 @@ + diff --git a/Cargo.lock b/Cargo.lock index 667a0ba6e3be..75b39565fcf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1236,6 +1236,7 @@ dependencies = [ "hostname", "hydroflow", "lattices", + "notify", "rand", "serde", "serde_json", @@ -1693,6 +1694,26 @@ dependencies = [ "str_stack", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "insta" version = "1.39.0" @@ -1788,6 +1809,26 @@ dependencies = [ "serde", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lattices" version = "0.5.7" @@ -1959,6 +2000,18 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.0.2" @@ -2033,6 +2086,23 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.6.0", + "filetime", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3418,7 +3488,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 1.0.2", "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", diff --git a/datastores/gossip_kv/Cargo.toml b/datastores/gossip_kv/Cargo.toml index 92e401f538d8..c466b8d48a1d 100644 --- a/datastores/gossip_kv/Cargo.toml +++ b/datastores/gossip_kv/Cargo.toml @@ -12,6 +12,9 @@ config = "0.14.0" hostname = "0.4.0" hydroflow = { path="../../hydroflow" } lattices = { path = '../../lattices'} +# The specific set of features for Notify are picked to disable the default cross-beam channels (cause problems with +# tokio) and use std channels. See docs for more information: https://docs.rs/notify/6.1.1/notify/ +notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] } rand = "0.8.5" serde = "1.0.203" serde_json = "1.0.117" diff --git a/datastores/gossip_kv/Makefile b/datastores/gossip_kv/Makefile new file mode 100644 index 000000000000..34f5e7edb9b6 --- /dev/null +++ b/datastores/gossip_kv/Makefile @@ -0,0 +1,42 @@ +# Makefile + +## Minikube Options. +MINIKUBE_DISK_SIZE=100g +MINIKUBE_CPUS=16 +MINIKUBE_MEMORY=32768 + +# Docker Image Tags +BASE_IMAGE_TAG=hydroflow/gossip-kv-server-base-image:latest +SERVER_IMAGE_TAG=hydroflow/gossip-kv-server:latest +CLI_IMAGE_TAG=hydroflow/gossip-kv-cli:latest + +# Default target when you run 'make' + +# Target to start Minikube with specific options +start_minikube: + minikube start --disk-size=$(MINIKUBE_DISK_SIZE) --cpus=$(MINIKUBE_CPUS) --memory=$(MINIKUBE_MEMORY) + eval $$(minikube docker-env) + +# Target to build the Docker images +build_docker_images: build_base_image build_server_image build_cli_image + + +build_base_image: + docker build -t "$(BASE_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/baseimage.Dockerfile ../.. + +build_server_image: + docker build -t "$(SERVER_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/Dockerfile ../.. + +build_cli_image: + docker build -t "$(CLI_IMAGE_TAG)" -f ../../datastores/gossip_kv/cli/Dockerfile ../.. + +# Target to clean up the Minikube cluster +clean_local: + minikube delete + +# Target to deploy the Gossip KV Server to the Minikube cluster +deploy_local: + kubectl apply -f ../../datastores/gossip_kv/server/local + +# Target to delete the Minikube cluster and build again +rebuild_local: clean_local start_minikube build_docker_images \ No newline at end of file diff --git a/datastores/gossip_kv/cli/Dockerfile b/datastores/gossip_kv/cli/Dockerfile new file mode 100644 index 000000000000..b9f58de6731b --- /dev/null +++ b/datastores/gossip_kv/cli/Dockerfile @@ -0,0 +1,8 @@ +FROM "hydroflow/gossip-kv-server-base-image:latest" AS builder +WORKDIR /usr/src/gossip-kv-server +COPY . . +RUN find . +RUN cargo build --release --workspace -p gossip_kv + +FROM rustlang/rust:nightly-slim +COPY --from=builder /usr/src/gossip-kv-server/target/release/gossip_cli /usr/local/bin/gossip_cli diff --git a/datastores/gossip_kv/cli/main.rs b/datastores/gossip_kv/cli/main.rs index 76ec43c1f1e3..93264bc72e64 100644 --- a/datastores/gossip_kv/cli/main.rs +++ b/datastores/gossip_kv/cli/main.rs @@ -10,7 +10,7 @@ use tracing::error; #[derive(Debug, Parser)] struct Opts { #[clap(short, long, help = "Server address to connect to.")] - server_address: Option, + server_address: Option, } /// Dummy app for using clap to process commands for interactive CLI. @@ -87,12 +87,13 @@ async fn main() { let opts = Opts::parse(); // Bind to OS-assigned port on localhost. - let address = ipv4_resolve("localhost:0").unwrap(); + let address = ipv4_resolve("0.0.0.0:0").unwrap(); // Default to localhost:3000 if not provided. - let server_address = opts - .server_address - .unwrap_or_else(|| ipv4_resolve("localhost:3001").unwrap()); + let server_address = opts.server_address.map_or_else( + || ipv4_resolve("localhost:3001").unwrap(), + |s| ipv4_resolve(&s).unwrap(), + ); // Setup UDP sockets for communication. let (outbound, inbound, _) = bind_udp_bytes(address).await; diff --git a/datastores/gossip_kv/server/Dockerfile b/datastores/gossip_kv/server/Dockerfile index af67697562fa..e4f592261ca1 100644 --- a/datastores/gossip_kv/server/Dockerfile +++ b/datastores/gossip_kv/server/Dockerfile @@ -7,7 +7,8 @@ RUN cargo build --release --workspace -p gossip_kv FROM rustlang/rust:nightly-slim COPY --from=builder /usr/src/gossip-kv-server/target/release/gossip_server /usr/local/bin/gossip_server +RUN mkdir -p /config/static # Don't skip the trailing slash in the destination directory -COPY datastores/gossip_kv/server/config/*.toml /config/ +COPY datastores/gossip_kv/server/config/static/*.toml /config/static/ CMD ["gossip_server"] # ENTRYPOINT ["tail", "-f", "/dev/null"] diff --git a/datastores/gossip_kv/server/README.md b/datastores/gossip_kv/server/README.md index 1108b30a37ca..9ed107aca2c0 100644 --- a/datastores/gossip_kv/server/README.md +++ b/datastores/gossip_kv/server/README.md @@ -22,11 +22,16 @@ Speeds up code changes by caching build dependencies. docker build -t "hydroflow/gossip-kv-server-base-image:latest" -f datastores/gossip_kv/server/baseimage.Dockerfile . ``` -## Build Docker Server Image +## Build Docker Image for Gossip Server ```shell docker build -t "hydroflow/gossip-kv-server:latest" -f datastores/gossip_kv/server/Dockerfile . ``` +## Build Docker Image for Gossip CLI +```shell +docker build -t "hydroflow/gossip-kv-cli:latest" -f datastores/gossip_kv/cli/Dockerfile . +``` + ## Check if minikube has the image You should see "hydroflow/gossip-kv" ```shell diff --git a/datastores/gossip_kv/server/config/default.toml b/datastores/gossip_kv/server/config/default.toml deleted file mode 100644 index 45f137a02760..000000000000 --- a/datastores/gossip_kv/server/config/default.toml +++ /dev/null @@ -1 +0,0 @@ -seed_nodes = [] \ No newline at end of file diff --git a/datastores/gossip_kv/server/config/mod.rs b/datastores/gossip_kv/server/config/mod.rs index 4bd5778c7325..13ff54873ad2 100644 --- a/datastores/gossip_kv/server/config/mod.rs +++ b/datastores/gossip_kv/server/config/mod.rs @@ -1,5 +1,12 @@ +use std::path::PathBuf; + use config::{Config, ConfigError, File}; +use hydroflow::futures::future::ready; +use hydroflow::futures::{Stream, StreamExt}; +use notify::{Event, EventHandler, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::UnboundedSender; +use tracing::trace; /// L0 Data Store settings. #[derive(Debug, Deserialize, Serialize)] @@ -9,6 +16,22 @@ pub struct ServerSettings { pub seed_nodes: Vec, } +const CONFIG_ROOT: &str = "config"; +const STATIC_CONFIG_PATH: &str = "static"; +const DYNAMIC_CONFIG_PATH: &str = "dynamic"; + +fn static_config_path(subpath: &str) -> PathBuf { + PathBuf::from(CONFIG_ROOT) + .join(STATIC_CONFIG_PATH) + .join(subpath) +} + +fn dynamic_config_path(subpath: &str) -> PathBuf { + PathBuf::from(CONFIG_ROOT) + .join(DYNAMIC_CONFIG_PATH) + .join(subpath) +} + impl ServerSettings { /// Load the settings from the configuration files. pub fn new() -> Result { @@ -16,14 +39,17 @@ impl ServerSettings { let settings = Config::builder() /* Load the default settings from the `config/default.toml` file. */ - .add_source(File::with_name("config/default")) + .add_source(File::from(static_config_path("default.toml")).required(false)) /* Load additional overrides based on context (alpha, beta, production, etc.), if they exist. */ - .add_source(File::with_name(&format!("config/{}", run_mode)).required(false)) + .add_source(File::from(static_config_path(&run_mode)).required(false)) /* Load the local settings, if they exist. These are .gitignored to prevent accidental check-in. */ - .add_source(File::with_name("config/local").required(false)) + .add_source(File::from(static_config_path("local")).required(false)) + + /* Load the dynamic settings, if they exist. These always override any static configuration*/ + .add_source(File::from(dynamic_config_path("dynamic.toml")).required(false)) .build()?; settings.try_deserialize() @@ -36,9 +62,69 @@ pub struct SeedNodeSettings { /// The ID of the seed node. pub id: String, - /// The IP address on which the seed node is listening for gossip messages. - pub ip: String, + /// The address on which the seed node is listening for gossip messages. + pub address: String, +} + +/// Setup a watcher for the settings files and return a stream of settings changes. +/// +/// Returns the watcher, the initial settings, and a stream of settings changes. The watcher is +/// returned so that it can be kept alive for the lifetime of the application. Also returns a +/// snapshot of the current settings. +pub fn setup_settings_watch() -> ( + RecommendedWatcher, + ServerSettings, + impl Stream, +) { + let (tx, rx) = hydroflow::util::unbounded_channel(); + + // Setup the watcher + let mut watcher = notify::RecommendedWatcher::new( + UnboundedSenderEventHandler::new(tx), + notify::Config::default(), + ) + .unwrap(); + watcher + .watch(&PathBuf::from(CONFIG_ROOT), RecursiveMode::Recursive) + .unwrap(); - /// The port on which the seed node is listening for gossip messages. - pub port: u16, + // Read initial settings + let initial_settings = ServerSettings::new().unwrap(); + + let change_stream = rx + .map(Result::unwrap) + .map(|event| { + trace!("Event: {:?}", event); + match event.kind { + EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) => { + Some(ServerSettings::new().unwrap()) + } + _ => { + trace!("Unhandled event: {:?}", event); + None + } + } + }) + .filter_map(ready); + + // If the watcher is dropped, the stream will stop producing events. So, returning the watcher. + (watcher, initial_settings, change_stream) +} + +/// Wraps an UnboundedSender to implement the notify::EventHandler trait. This allows sending +/// file notification evnts to UnboundedSender instances. +struct UnboundedSenderEventHandler { + tx: UnboundedSender>, +} + +impl UnboundedSenderEventHandler { + fn new(tx: UnboundedSender>) -> Self { + Self { tx } + } +} + +impl EventHandler for UnboundedSenderEventHandler { + fn handle_event(&mut self, event: notify::Result) { + self.tx.send(event).unwrap(); + } } diff --git a/datastores/gossip_kv/server/config/static/default.toml b/datastores/gossip_kv/server/config/static/default.toml new file mode 100644 index 000000000000..7e01cac0d136 --- /dev/null +++ b/datastores/gossip_kv/server/config/static/default.toml @@ -0,0 +1,5 @@ +seed_nodes = [] + +#[[seed_nodes]] +#id = "gossip-kv-seed-nodes-0" +#address = "gossip-kv-seed-nodes-0.gossip-kv-seed-nodes.default.svc.cluster.local:3001" \ No newline at end of file diff --git a/datastores/gossip_kv/server/config/development.toml b/datastores/gossip_kv/server/config/static/development.toml similarity index 100% rename from datastores/gossip_kv/server/config/development.toml rename to datastores/gossip_kv/server/config/static/development.toml diff --git a/datastores/gossip_kv/server/deployment/local/objects.yaml b/datastores/gossip_kv/server/deployment/local/objects.yaml new file mode 100644 index 000000000000..45e20d5b43de --- /dev/null +++ b/datastores/gossip_kv/server/deployment/local/objects.yaml @@ -0,0 +1,88 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: gossip-kv-seed-nodes + labels: + app: gossip-kv-seed-nodes +spec: + replicas: 1 + serviceName: gossip-kv-seed-nodes + selector: + matchLabels: + app: gossip-kv-seed-nodes + template: + metadata: + labels: + app: gossip-kv-seed-nodes + spec: + containers: + - name: gossip-kv-server + image: docker.io/hydroflow/gossip-kv-server:latest + imagePullPolicy: IfNotPresent +# Uncomment the following for debugging +# command: [ "/bin/sh" ] +# args: [ "-c", "while true; do sleep 3600; done" ] + env: + - name: RUST_LOG + value: "trace" + - name: RUST_BACKTRACE + value: "full" + ports: + - containerPort: 3001 + protocol: UDP + volumeMounts: + - name: gossip-kv-dynamic-config + mountPath: /config/dynamic + volumes: + - name: gossip-kv-dynamic-config + configMap: + name: gossip-kv-dynamic-config +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gossip-kv-cli + labels: + app: gossip-kv-cli +spec: + replicas: 1 + selector: + matchLabels: + app: gossip-kv-cli + template: + metadata: + labels: + app: gossip-kv-cli + spec: + containers: + - name: gossip-kv-cli + image: docker.io/hydroflow/gossip-kv-cli:latest + imagePullPolicy: IfNotPresent + command: ["/bin/sh"] + args: ["-c", "while true; do sleep 3600; done"] + tty: true + env: + - name: RUST_LOG + value: "info" +--- +apiVersion: v1 +kind: Service +metadata: + name: gossip-kv-seed-nodes + labels: + app: gossip-kv-seed-nodes +spec: + ports: + - port: 3001 + targetPort: 3001 + protocol: UDP + clusterIP: None + selector: + app: gossip-kv-seed-nodes +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: gossip-kv-dynamic-config +data: + dynamic.toml: | \ No newline at end of file diff --git a/datastores/gossip_kv/server/main.rs b/datastores/gossip_kv/server/main.rs index be47552e44be..cf983d0e589c 100644 --- a/datastores/gossip_kv/server/main.rs +++ b/datastores/gossip_kv/server/main.rs @@ -9,10 +9,11 @@ use gossip_protocol::membership::{MemberDataBuilder, Protocol}; use gossip_protocol::{ClientRequest, GossipMessage}; use hydroflow::futures::{SinkExt, StreamExt}; use hydroflow::tokio_stream::wrappers::ReceiverStream; -use hydroflow::util::bind_udp_bytes; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve}; use hydroflow::{bincode, tokio}; -use tracing::{error, info}; +use tracing::{error, info, trace}; +use crate::config::{setup_settings_watch, SeedNodeSettings}; use crate::membership::member_name; use crate::server::{server, SeedNode}; @@ -33,12 +34,18 @@ struct Opts { client_port: u16, } +fn make_seed_node(settings: &SeedNodeSettings) -> SeedNode { + SeedNode { + id: settings.id.clone(), + address: ipv4_resolve(&settings.address).unwrap(), + } +} + #[hydroflow::main] async fn main() { tracing_subscriber::fmt::init(); let opts: Opts = Opts::parse(); - let settings = config::ServerSettings::new().unwrap(); // Setup protocol information in the member metadata. let client_protocol_address = @@ -119,19 +126,21 @@ async fn main() { }; ready(mapped) }); + // TODO: Trigger gossip every X (configurable number of seconds) + let (_gossip_tx, gossip_rx) = tokio::sync::mpsc::channel::<()>(20 /* Configure size */); - // Configure seed nodes for gossip protocol. - let seed_nodes = settings + let (_watcher, server_settings, settings_stream) = setup_settings_watch(); + + let seed_nodes = server_settings .seed_nodes .iter() - .map(|node| SeedNode { - id: node.id.clone(), - address: SocketAddr::new(node.ip.parse().unwrap(), node.port), - }) - .collect(); + .map(make_seed_node) + .collect::>(); - // TODO: Trigger gossip every X (configurable number of seconds) - let (_gossip_tx, gossip_rx) = tokio::sync::mpsc::channel::<()>(20 /* Configure size */); + let seed_node_stream = settings_stream.map(|settings| { + trace!("Settings updated. Reloading seed nodes"); + settings.seed_nodes.iter().map(make_seed_node).collect() + }); // Create and run the server let mut server = server( @@ -142,6 +151,7 @@ async fn main() { ReceiverStream::new(gossip_rx), member_data, seed_nodes, + seed_node_stream, ); server.run_async().await; diff --git a/datastores/gossip_kv/server/server.rs b/datastores/gossip_kv/server/server.rs index 6d890fc3b1e0..fad98f1fc785 100644 --- a/datastores/gossip_kv/server/server.rs +++ b/datastores/gossip_kv/server/server.rs @@ -55,7 +55,17 @@ pub type MessageId = String; /// -- `client_outputs`: The output sink of client responses for the client protocol. /// -- `member_info`: The membership information of the server. /// -- `seed_nodes`: A list of seed nodes that can be used to bootstrap the gossip cluster. -pub fn server( +#[expect(clippy::too_many_arguments)] +pub fn server< + ClientInput, + ClientOutput, + GossipInput, + GossipOutput, + GossipTrigger, + SeedNodeStream, + Addr, + E, +>( client_inputs: ClientInput, client_outputs: ClientOutput, gossip_inputs: GossipInput, @@ -63,6 +73,7 @@ pub fn server, seed_nodes: Vec>, + seed_node_stream: SeedNodeStream, ) -> Hydroflow<'static> where ClientInput: Stream + Unpin + 'static, @@ -70,6 +81,7 @@ where GossipInput: Stream + Unpin + 'static, GossipOutput: Sink<(GossipMessage, Addr), Error = E> + Unpin + 'static, GossipTrigger: Stream + Unpin + 'static, + SeedNodeStream: Stream>> + Unpin + 'static, Addr: Address + DeserializeOwned + 'static, E: Debug + 'static, { @@ -85,6 +97,12 @@ where on_start = initialize() -> tee(); on_start -> for_each(|_| info!("{:?}: Transducer started.", context.current_tick())); + seed_nodes = source_stream(seed_node_stream) + -> fold::<'static>(|| Box::new(seed_nodes), |last_seed_nodes, new_seed_nodes: Vec>| { + **last_seed_nodes = new_seed_nodes; + info!("Updated seed nodes: {:?}", **last_seed_nodes); + }); + // Setup member metadata for this process. on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap())) -> writes; @@ -271,6 +289,7 @@ where peer_names.insert(row_key.clone()); }); + let seed_nodes = &#seed_nodes; seed_nodes.iter().for_each(|seed_node| { peer_names.insert(seed_node.id.clone()); }); @@ -311,6 +330,7 @@ mod tests { use std::collections::HashSet; use gossip_protocol::membership::{MemberDataBuilder, Protocol}; + use hydroflow::tokio_stream::empty; use hydroflow::util::simulation::{Address, Fleet, Hostname}; use super::*; @@ -353,6 +373,7 @@ mod tests { gossip_trigger_rx, member_data, vec![], + empty(), ) }); @@ -457,6 +478,7 @@ mod tests { gossip_trigger_rx, member_data, vec![], + empty(), ) }); @@ -607,6 +629,7 @@ mod tests { gossip_trigger_rx_a, member_data, seed_nodes_clone, + empty(), ) }); @@ -639,6 +662,7 @@ mod tests { gossip_trigger_rx_b, member_data, seed_nodes_clone, + empty(), ) });