Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
sargarass committed Dec 19, 2023
2 parents 115500a + a710c52 commit 742d7c3
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 87 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate
### Changed
- deps: update `quanta` to v0.12 and `erased-serde` to v0.4.
- deps: update `quanta` to v0.12, `hyper` to v1 and `erased-serde` to v0.4.
- context: slightly improve performance of time measurements.
- utils: slightly improve performance of `RateLimiter`.
- telemeter: handle only `GET /metrics` requests.
- telemeter: rename the `address` config parameter to `listen` with alias to the old name.

### Fixed
- telemeter: avoid spawning tasks for a HTTP server, it improves common metrics.

## [0.2.0-alpha.11] - 2023-11-10
### Added
Expand Down
4 changes: 3 additions & 1 deletion elfo-telemeter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ unstable = []
elfo-core = { version = "0.2.0-alpha.11", path = "../elfo-core", features = ["unstable"] } # TODO: do not need

tokio = "1"
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }
hyper = { version = "1.0.1", features = ["server", "http1"] }
hyper-util = { version = "0.1.1", features = ["tokio"] }
pin-project-lite = "0.2"
serde = { version = "1.0.120", features = ["derive"] }
metrics = "0.17"
metrics-util = "0.10"
Expand Down
113 changes: 33 additions & 80 deletions elfo-telemeter/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,33 @@
use std::{sync::Arc, time::Duration};

use metrics::gauge;
use tokio::task::JoinHandle;
use tracing::{error, info};

use elfo_core::{
message, messages::ConfigUpdated, msg, scope, time::Interval, tracing::TraceId, ActorGroup,
Blueprint, Context, MoveOwnership, RestartParams, RestartPolicy,
message, messages::ConfigUpdated, msg, stream::Stream, time::Interval, ActorGroup, Blueprint,
Context, RestartParams, RestartPolicy, SourceHandle,
};

use crate::{
config::{Config, Retention, Sink},
protocol::{GetSnapshot, Snapshot},
hyper,
protocol::{GetSnapshot, Render, Rendered, ServerFailed, Snapshot},
render::Renderer,
storage::Storage,
};

struct Telemeter {
ctx: Context<Config>,
interval: Interval<CompactionTick>,
server: Option<Stream<ServerFailed>>,
storage: Arc<Storage>,
snapshot: Arc<Snapshot>,
renderer: Renderer,
}

#[message(ret = Rendered)]
struct Render;

#[message]
struct Rendered(#[serde(serialize_with = "elfo_core::dumping::hide")] String);

#[message]
struct CompactionTick;

#[message]
struct ServerFailed(MoveOwnership<hyper::Error>);

pub(crate) fn new(storage: Arc<Storage>) -> Blueprint {
ActorGroup::new()
.config::<Config>()
Expand All @@ -54,6 +46,7 @@ impl Telemeter {

Self {
interval: ctx.attach(Interval::new(CompactionTick)),
server: None,
storage,
snapshot: Default::default(),
renderer,
Expand All @@ -65,8 +58,8 @@ impl Telemeter {
// Now only prometheus is supported.
assert_eq!(self.ctx.config().sink, Sink::Prometheus);

let mut address = self.ctx.config().address;
let mut server = start_server(&self.ctx);
let mut listen = self.ctx.config().listen;
self.start_server();

self.interval.start(self.ctx.config().compaction_interval);

Expand All @@ -75,14 +68,17 @@ impl Telemeter {
ConfigUpdated => {
let config = self.ctx.config();

if config.address != address {
info!("address changed, rerun the server");
server.abort();
address = config.address;
server = start_server(&self.ctx);
}

self.renderer.configure(config);

if config.listen != listen {
info!(
message = "listen address changed, rerun the server",
old = %listen,
new = %config.listen,
);
listen = config.listen;
self.start_server();
}
}
(GetSnapshot, token) => {
// Rendering includes compaction, skip extra compaction tick.
Expand All @@ -109,14 +105,12 @@ impl Telemeter {
CompactionTick => {
self.fill_snapshot(/* only_histograms = */ true);
}
ServerFailed(error) => {
error!(error = %&error.take().unwrap(), "server failed");
panic!("server failed");
ServerFailed(err) => {
error!(error = %err, "server failed");
panic!("server failed, cannot continue");
}
});
}

server.abort();
}

fn fill_snapshot(&mut self, only_histograms: bool) {
Expand All @@ -134,59 +128,18 @@ impl Telemeter {
let snapshot = Arc::make_mut(&mut self.snapshot);
snapshot.distributions_mut().for_each(|d| d.reset());
}
}

fn start_server(ctx: &Context<Config>) -> JoinHandle<()> {
use hyper::{
server::{conn::AddrStream, Server},
service::{make_service_fn, service_fn},
Body, Error as HyperError, Response,
};

let address = ctx.config().address;
let ctx = Arc::new(ctx.pruned());
let ctx1 = ctx.clone();

let scope = scope::expose();
let scope1 = scope.clone();

let serving = async move {
let server = Server::try_bind(&address)?;
let make_svc = make_service_fn(move |_socket: &AddrStream| {
let ctx = ctx.clone();
let scope = scope.clone();

async move {
Ok::<_, HyperError>(service_fn(move |_| {
let ctx = ctx.clone();
let scope = scope.clone();

let f = async move {
let Rendered(output) = ctx
.request_to(ctx.addr(), Render)
.resolve()
.await
.expect("failed to send to the telemeter");

Ok::<_, HyperError>(Response::new(Body::from(output)))
};

scope.set_trace_id(TraceId::generate());
scope.within(f)
}))
}
});
server.serve(make_svc).await
};

tokio::spawn(async move {
if let Err(err) = serving.await {
let f = async {
let _ = ctx1.send_to(ctx1.group(), ServerFailed(err.into())).await;
};

scope1.set_trace_id(TraceId::generate());
scope1.within(f).await;
fn start_server(&mut self) {
// Terminate a running server.
if let Some(source) = self.server.take() {
source.terminate();
}
})

// Start a new one.
let listen = self.ctx.config().listen;
let pruned_ctx = self.ctx.pruned();
let source = Stream::once(hyper::server(listen, pruned_ctx));

self.server = Some(self.ctx.attach(source));
}
}
3 changes: 2 additions & 1 deletion elfo-telemeter/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ pub(crate) struct Config {
/// The sink's type.
pub(crate) sink: Sink,
/// The address to expose for scraping.
pub(crate) address: SocketAddr,
#[serde(alias = "address")]
pub(crate) listen: SocketAddr,
/// How long samples should be considered in summaries.
#[serde(default)]
pub(crate) retention: Retention,
Expand Down
157 changes: 157 additions & 0 deletions elfo-telemeter/src/hyper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::{
convert::Infallible,
future::Future,
net::SocketAddr,
pin::Pin,
string::ToString,
task::Poll,
time::{Duration, Instant},
};

use hyper::{body::Body, rt, server::conn, service, Method, Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use pin_project_lite::pin_project;
use tokio::{net::TcpListener, time::timeout};
use tracing::{debug, info, warn};

use elfo_core::{scope, tracing::TraceId, Context};

use crate::protocol::{Render, Rendered, ServerFailed};

const HEADER_READ_TIMEOUT: Duration = Duration::from_secs(3);
const SERVE_TIMEOUT: Duration = Duration::from_secs(10);

/// Runs a simple HTTP server that responds to `GET /metrics` requests.
/// * It supports only HTTP/1.
/// * It doesn't support keep-alive connections.
/// * It doesn't support TLS.
/// * It doesn't support compression.
/// * It handles requests one by one with some reasonable timeouts.
pub(crate) async fn server(addr: SocketAddr, ctx: Context) -> ServerFailed {
let listener = match TcpListener::bind(addr).await {
Ok(listener) => listener,
Err(err) => return ServerFailed(format!("cannot bind a listener: {err}")),
};

info!(bind = %addr, "listening TCP connections");

loop {
let (stream, peer) = match listener.accept().await {
Ok(pair) => pair,
Err(err) => return ServerFailed(format!("cannot accept a connection: {err}")),
};

// The server doesn't support keep-alive connections, so every connection is a
// new request. Thus, we can start a new trace right here.
scope::set_trace_id(TraceId::generate());

debug!(peer = %peer, "accepted a TCP connection");
let ctx = ctx.clone();

let serving = conn::http1::Builder::new()
.timer(TokioTimer)
.keep_alive(false) // KA is meaningless for rare requests.
.header_read_timeout(HEADER_READ_TIMEOUT)
.serve_connection(
TokioIo::new(stream),
service::service_fn(move |req| handle(req, ctx.clone())),
);

match flat_error(timeout(SERVE_TIMEOUT, serving).await) {
Ok(()) => debug!(peer = %peer, "finished serving a HTTP connection"),
Err(err) => warn!(
message = "failed to serve a HTTP connection",
error = %err,
peer = %peer,
),
}
}
}

// Supports only `GET /metrics` requests.
async fn handle(req: Request<impl Body>, ctx: Context) -> Result<Response<String>, Infallible> {
if req.method() != Method::GET {
return Ok(Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(String::new())
.unwrap());
}

if req.uri().path() != "/metrics" {
return Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(String::new())
.unwrap());
}

ctx.request_to(ctx.addr(), Render)
.resolve()
.await
.map(|Rendered(text)| Response::new(text))
.or_else(|err| {
warn!(error = %err, "failed to render metrics for HTTP response");

Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(String::new())
.unwrap())
})
}

fn flat_error(res: Result<Result<(), impl ToString>, impl ToString>) -> Result<(), String> {
match res {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err.to_string()),
Err(err) => Err(err.to_string()),
}
}

// === TokioTimer ===
// TODO: Replace once https://github.com/hyperium/hyper-util/pull/73 is released.
// Don't forget to remove `pin-project-lite` from `Cargo.toml`.

#[derive(Clone, Debug)]
struct TokioTimer;

impl rt::Timer for TokioTimer {
fn sleep(&self, duration: Duration) -> Pin<Box<dyn rt::Sleep>> {
Box::pin(TokioSleep {
inner: tokio::time::sleep(duration),
})
}

fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn rt::Sleep>> {
Box::pin(TokioSleep {
inner: tokio::time::sleep_until(deadline.into()),
})
}

fn reset(&self, sleep: &mut Pin<Box<dyn rt::Sleep>>, new_deadline: Instant) {
if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
sleep.reset(new_deadline)
}
}
}

pin_project! {
struct TokioSleep {
#[pin]
inner: tokio::time::Sleep,
}
}

impl Future for TokioSleep {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

impl rt::Sleep for TokioSleep {}

impl TokioSleep {
fn reset(self: Pin<&mut Self>, deadline: Instant) {
self.project().inner.as_mut().reset(deadline.into());
}
}
1 change: 1 addition & 0 deletions elfo-telemeter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod protocol;

mod actor;
mod config;
mod hyper;
mod recorder;
mod render;
mod storage;
Expand Down
9 changes: 9 additions & 0 deletions elfo-telemeter/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ use metrics_util::Summary;

use elfo_core::{message, ActorMeta, Local};

#[message(ret = Rendered)]
pub(crate) struct Render;

#[message]
pub(crate) struct Rendered(#[serde(serialize_with = "elfo_core::dumping::hide")] pub(crate) String);

#[message]
pub(crate) struct ServerFailed(pub(crate) String);

/// A command to get actual snapshot of all metrics.
/// The response is restricted to be local only for now.
#[message(ret = Local<Arc<Snapshot>>)]
Expand Down
Loading

0 comments on commit 742d7c3

Please sign in to comment.