Skip to content

Commit

Permalink
feat: add HTTP admin endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Jan 14, 2025
1 parent b167486 commit 109f40e
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ members = ["crates/*"]
anyhow = { workspace = true }
async-nats = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true, features = ["derive", "cargo", "env"] }
futures = { workspace = true }
http = { workspace = true, features = ["std"] }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true, features = ["server"] }
nkeys = { workspace = true }
# One version back to avoid clashes with 0.10 of otlp
opentelemetry = { workspace = true, features = ["rt-tokio"] }
Expand Down Expand Up @@ -53,6 +58,10 @@ chrono = "0.4"
clap = { version = "4", features = ["derive", "cargo", "env"] }
cloudevents-sdk = "0.7"
futures = "0.3"
http = { version = "1", default-features = false }
http-body-util = { version = "0.1", default-features = false }
hyper = { version = "1", default-features = false }
hyper-util = { version = "0.1", default-features = false }
indexmap = { version = "2", features = ["serde"] }
jsonschema = "0.17"
lazy_static = "1"
Expand Down
67 changes: 66 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use core::net::SocketAddr;

use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context as _;
use async_nats::jetstream::{stream::Stream, Context};
use bytes::Bytes;
use clap::Parser;
use hyper_util::rt::{TokioExecutor, TokioIo};
use nats::StreamPersistence;
use tokio::net::TcpListener;
use tokio::spawn;
use tokio::sync::Semaphore;
use tracing::log::debug;
use tracing::{debug, error};
use wadm_types::api::DEFAULT_WADM_TOPIC_PREFIX;

use wadm::{
Expand Down Expand Up @@ -238,6 +245,10 @@ struct Args {
hide = true
)]
max_wasmbus_event_stream_bytes: i64,

#[clap(long = "http-admin", env = "WADM_HTTP_ADMIN")]
/// HTTP administration endpoint address
http_admin: Option<SocketAddr>,
}

#[tokio::main]
Expand Down Expand Up @@ -456,6 +467,60 @@ async fn main() -> anyhow::Result<()> {
ManifestNotifier::new(wadm_event_prefix, context),
)
.await?;

if let Some(addr) = args.http_admin {
let socket = TcpListener::bind(addr)
.await
.context("failed to bind on HTTP administation endpoint")?;
let svc = hyper::service::service_fn(move |req| {
const OK: &str = r#"{"status":"ok"}"#;
async move {
let (http::request::Parts { method, uri, .. }, _) = req.into_parts();
match (method.as_str(), uri.path()) {
("HEAD", "/livez") => Ok(http::Response::default()),
("GET", "/livez") => Ok(http::Response::new(http_body_util::Full::new(
Bytes::from(OK),
))),
(method, "/livez") => http::Response::builder()
.status(http::StatusCode::METHOD_NOT_ALLOWED)
.body(http_body_util::Full::new(Bytes::from(format!(
"method `{method}` not supported for path `/livez`"
)))),
("HEAD", "/readyz") => Ok(http::Response::default()),
("GET", "/readyz") => Ok(http::Response::new(http_body_util::Full::new(
Bytes::from(OK),
))),
(method, "/readyz") => http::Response::builder()
.status(http::StatusCode::METHOD_NOT_ALLOWED)
.body(http_body_util::Full::new(Bytes::from(format!(
"method `{method}` not supported for path `/readyz`"
)))),
(.., path) => http::Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body(http_body_util::Full::new(Bytes::from(format!(
"unknown endpoint `{path}`"
)))),
}
}
});
let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
spawn(async move {
loop {
let stream = match socket.accept().await {
Ok((stream, _)) => stream,
Err(err) => {
error!(?err, "failed to accept HTTP administration connection");
continue;
}
};
let svc = svc.clone();
if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await {
error!(?err, "failed to serve HTTP administration connection");
}
}
});
}

tokio::select! {
res = server.serve() => {
res?
Expand Down

0 comments on commit 109f40e

Please sign in to comment.