Skip to content

Commit

Permalink
feat(anna): Hot reloadable dynamic config.
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitkulshreshtha committed Oct 4, 2024
1 parent 8a5817c commit f373b77
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 29 deletions.
1 change: 1 addition & 0 deletions .idea/hydroflow.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 71 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datastores/gossip_kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ config = "0.14.0"
hostname = "0.4.0"
hydroflow = { path="../../hydroflow" }
lattices = { path = '../../lattices'}
# The specific set of features for Notify are picked to disable the default cross-beam channels (cause problems with
# tokio) and use std channels. See docs for more information: https://docs.rs/notify/6.1.1/notify/
notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] }
rand = "0.8.5"
serde = "1.0.203"
serde_json = "1.0.117"
Expand Down
42 changes: 42 additions & 0 deletions datastores/gossip_kv/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Makefile

## Minikube Options.
MINIKUBE_DISK_SIZE=100g
MINIKUBE_CPUS=16
MINIKUBE_MEMORY=32768

# Docker Image Tags
BASE_IMAGE_TAG=hydroflow/gossip-kv-server-base-image:latest
SERVER_IMAGE_TAG=hydroflow/gossip-kv-server:latest
CLI_IMAGE_TAG=hydroflow/gossip-kv-cli:latest

# Default target when you run 'make'

# Target to start Minikube with specific options
start_minikube:
minikube start --disk-size=$(MINIKUBE_DISK_SIZE) --cpus=$(MINIKUBE_CPUS) --memory=$(MINIKUBE_MEMORY)
eval $$(minikube docker-env)

# Target to build the Docker images
build_docker_images: build_base_image build_server_image build_cli_image


build_base_image:
docker build -t "$(BASE_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/baseimage.Dockerfile ../..

build_server_image:
docker build -t "$(SERVER_IMAGE_TAG)" -f ../../datastores/gossip_kv/server/Dockerfile ../..

build_cli_image:
docker build -t "$(CLI_IMAGE_TAG)" -f ../../datastores/gossip_kv/cli/Dockerfile ../..

# Target to clean up the Minikube cluster
clean_local:
minikube delete

# Target to deploy the Gossip KV Server to the Minikube cluster
deploy_local:
kubectl apply -f ../../datastores/gossip_kv/server/local

# Target to delete the Minikube cluster and build again
rebuild_local: clean_local start_minikube build_docker_images
8 changes: 8 additions & 0 deletions datastores/gossip_kv/cli/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM "hydroflow/gossip-kv-server-base-image:latest" AS builder
WORKDIR /usr/src/gossip-kv-server
COPY . .
RUN find .
RUN cargo build --release --workspace -p gossip_kv

FROM rustlang/rust:nightly-slim
COPY --from=builder /usr/src/gossip-kv-server/target/release/gossip_cli /usr/local/bin/gossip_cli
11 changes: 6 additions & 5 deletions datastores/gossip_kv/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tracing::error;
#[derive(Debug, Parser)]
struct Opts {
#[clap(short, long, help = "Server address to connect to.")]
server_address: Option<SocketAddr>,
server_address: Option<String>,
}

/// Dummy app for using clap to process commands for interactive CLI.
Expand Down Expand Up @@ -87,12 +87,13 @@ async fn main() {
let opts = Opts::parse();

// Bind to OS-assigned port on localhost.
let address = ipv4_resolve("localhost:0").unwrap();
let address = ipv4_resolve("0.0.0.0:0").unwrap();

// Default to localhost:3000 if not provided.
let server_address = opts
.server_address
.unwrap_or_else(|| ipv4_resolve("localhost:3001").unwrap());
let server_address = opts.server_address.map_or_else(
|| ipv4_resolve("localhost:3001").unwrap(),
|s| ipv4_resolve(&s).unwrap(),
);

// Setup UDP sockets for communication.
let (outbound, inbound, _) = bind_udp_bytes(address).await;
Expand Down
3 changes: 2 additions & 1 deletion datastores/gossip_kv/server/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ RUN cargo build --release --workspace -p gossip_kv
FROM rustlang/rust:nightly-slim
COPY --from=builder /usr/src/gossip-kv-server/target/release/gossip_server /usr/local/bin/gossip_server

RUN mkdir -p /config/static
# Don't skip the trailing slash in the destination directory
COPY datastores/gossip_kv/server/config/*.toml /config/
COPY datastores/gossip_kv/server/config/static/*.toml /config/static/
CMD ["gossip_server"]
# ENTRYPOINT ["tail", "-f", "/dev/null"]
7 changes: 6 additions & 1 deletion datastores/gossip_kv/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ Speeds up code changes by caching build dependencies.
docker build -t "hydroflow/gossip-kv-server-base-image:latest" -f datastores/gossip_kv/server/baseimage.Dockerfile .
```

## Build Docker Server Image
## Build Docker Image for Gossip Server
```shell
docker build -t "hydroflow/gossip-kv-server:latest" -f datastores/gossip_kv/server/Dockerfile .
```

## Build Docker Image for Gossip CLI
```shell
docker build -t "hydroflow/gossip-kv-cli:latest" -f datastores/gossip_kv/cli/Dockerfile .
```

## Check if minikube has the image
You should see "hydroflow/gossip-kv"
```shell
Expand Down
1 change: 0 additions & 1 deletion datastores/gossip_kv/server/config/default.toml

This file was deleted.

100 changes: 93 additions & 7 deletions datastores/gossip_kv/server/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use std::path::PathBuf;

use config::{Config, ConfigError, File};
use hydroflow::futures::future::ready;
use hydroflow::futures::{Stream, StreamExt};
use notify::{Event, EventHandler, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::UnboundedSender;
use tracing::trace;

/// L0 Data Store settings.
#[derive(Debug, Deserialize, Serialize)]
Expand All @@ -9,21 +16,40 @@ pub struct ServerSettings {
pub seed_nodes: Vec<SeedNodeSettings>,
}

const CONFIG_ROOT: &str = "config";
const STATIC_CONFIG_PATH: &str = "static";
const DYNAMIC_CONFIG_PATH: &str = "dynamic";

fn static_config_path(subpath: &str) -> PathBuf {
PathBuf::from(CONFIG_ROOT)
.join(STATIC_CONFIG_PATH)
.join(subpath)
}

fn dynamic_config_path(subpath: &str) -> PathBuf {
PathBuf::from(CONFIG_ROOT)
.join(DYNAMIC_CONFIG_PATH)
.join(subpath)
}

impl ServerSettings {
/// Load the settings from the configuration files.
pub fn new() -> Result<Self, ConfigError> {
let run_mode = std::env::var("RUN_MODE").unwrap_or_else(|_| "development".into());

let settings = Config::builder()
/* Load the default settings from the `config/default.toml` file. */
.add_source(File::with_name("config/default"))
.add_source(File::from(static_config_path("default.toml")).required(false))

/* Load additional overrides based on context (alpha, beta, production, etc.), if they exist. */
.add_source(File::with_name(&format!("config/{}", run_mode)).required(false))
.add_source(File::from(static_config_path(&run_mode)).required(false))

/* Load the local settings, if they exist. These are .gitignored to prevent accidental
check-in. */
.add_source(File::with_name("config/local").required(false))
.add_source(File::from(static_config_path("local")).required(false))

/* Load the dynamic settings, if they exist. These always override any static configuration*/
.add_source(File::from(dynamic_config_path("dynamic.toml")).required(false))
.build()?;

settings.try_deserialize()
Expand All @@ -36,9 +62,69 @@ pub struct SeedNodeSettings {
/// The ID of the seed node.
pub id: String,

/// The IP address on which the seed node is listening for gossip messages.
pub ip: String,
/// The address on which the seed node is listening for gossip messages.
pub address: String,
}

/// Setup a watcher for the settings files and return a stream of settings changes.
///
/// Returns the watcher, the initial settings, and a stream of settings changes. The watcher is
/// returned so that it can be kept alive for the lifetime of the application. Also returns a
/// snapshot of the current settings.
pub fn setup_settings_watch() -> (
RecommendedWatcher,
ServerSettings,
impl Stream<Item = ServerSettings>,
) {
let (tx, rx) = hydroflow::util::unbounded_channel();

// Setup the watcher
let mut watcher = notify::RecommendedWatcher::new(
UnboundedSenderEventHandler::new(tx),
notify::Config::default(),
)
.unwrap();
watcher
.watch(&PathBuf::from(CONFIG_ROOT), RecursiveMode::Recursive)
.unwrap();

/// The port on which the seed node is listening for gossip messages.
pub port: u16,
// Read initial settings
let initial_settings = ServerSettings::new().unwrap();

let change_stream = rx
.map(Result::unwrap)
.map(|event| {
trace!("Event: {:?}", event);
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) => {
Some(ServerSettings::new().unwrap())
}
_ => {
trace!("Unhandled event: {:?}", event);
None
}
}
})
.filter_map(ready);

// If the watcher is dropped, the stream will stop producing events. So, returning the watcher.
(watcher, initial_settings, change_stream)
}

/// Wraps an UnboundedSender to implement the notify::EventHandler trait. This allows sending
/// file notification evnts to UnboundedSender instances.
struct UnboundedSenderEventHandler {
tx: UnboundedSender<notify::Result<Event>>,
}

impl UnboundedSenderEventHandler {
fn new(tx: UnboundedSender<notify::Result<Event>>) -> Self {
Self { tx }
}
}

impl EventHandler for UnboundedSenderEventHandler {
fn handle_event(&mut self, event: notify::Result<Event>) {
self.tx.send(event).unwrap();
}
}
5 changes: 5 additions & 0 deletions datastores/gossip_kv/server/config/static/default.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
seed_nodes = []

#[[seed_nodes]]
#id = "gossip-kv-seed-nodes-0"
#address = "gossip-kv-seed-nodes-0.gossip-kv-seed-nodes.default.svc.cluster.local:3001"
Loading

0 comments on commit f373b77

Please sign in to comment.