Skip to content

Commit

Permalink
feat: cyclotron metrics (#24531)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Aug 23, 2024
1 parent c25b571 commit 88003f8
Show file tree
Hide file tree
Showing 20 changed files with 390 additions and 136 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 87 additions & 0 deletions rust/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,90 @@ pub fn get_current_timestamp_seconds() -> f64 {
.unwrap_or_default()
.as_secs() as f64
}

// Shorthand for common metric types
pub fn inc(name: &'static str, labels: &[(String, String)], value: u64) {
metrics::counter!(name, labels).increment(value);
}

pub fn gauge(name: &'static str, lables: &[(String, String)], value: f64) {
metrics::gauge!(name, lables).set(value);
}

// A guard to record the time between creation and drop as a histogram entry
pub struct TimingGuard<'a> {
name: &'static str,
labels: TimingGuardLabels<'a>,
start: Instant,
}

// Shorthand constructor for that guard
pub fn timing_guard<'a>(name: &'static str, labels: &'a [(String, String)]) -> TimingGuard<'a> {
TimingGuard {
name,
labels: TimingGuardLabels::new(labels),
start: Instant::now(),
}
}

// Timing guards start out cheap to construct, but if you want to push extra
// labels onto them, they'll need to allocate. This enum tracks that state.
enum TimingGuardLabels<'a> {
None,
Borrowed(&'a [(String, String)]),
Owned(Vec<(String, String)>),
}

impl<'a> TimingGuard<'a> {
// This consumes the guard, making "label this span and then immediately report the timing"
// a one-liner (simple don't re-bind the return value), but also it's a bit of a footgun.
pub fn label(mut self, key: &str, value: &str) -> Self {
self.labels.push_label(key, value);
self
}

// This is meant to be used with the above to make what's happening more obvious. I don't know
// if it's good enough, but it's an improvement.
pub fn fin(self) {}
}

impl<'a> Drop for TimingGuard<'a> {
fn drop(&mut self) {
let labels = self.labels.as_slice();
metrics::histogram!(self.name, labels).record(self.start.elapsed().as_millis() as f64);
}
}

impl<'a> TimingGuardLabels<'a> {
fn new(labels: &'a [(String, String)]) -> Self {
if labels.is_empty() {
TimingGuardLabels::None
} else {
TimingGuardLabels::Borrowed(labels)
}
}

fn as_slice(&self) -> &[(String, String)] {
match self {
TimingGuardLabels::None => &[],
TimingGuardLabels::Borrowed(labels) => labels,
TimingGuardLabels::Owned(labels) => labels,
}
}

fn push_label(&mut self, key: &str, value: &str) {
match self {
TimingGuardLabels::None => {
*self = TimingGuardLabels::Owned(vec![(key.to_string(), value.to_string())]);
}
TimingGuardLabels::Borrowed(labels) => {
let mut existing = labels.to_vec();
existing.push((key.to_string(), value.to_string()));
*self = TimingGuardLabels::Owned(existing);
}
TimingGuardLabels::Owned(labels) => {
labels.push((key.to_string(), value.to_string()));
}
};
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions rust/cyclotron-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ mod config;
pub use config::ManagerConfig;
pub use config::PoolConfig;

// Some data is shared between workers and janitors on a given shard, using
// the metadata table. These keys are reserved for that purpose

// The shard id is a fixed value that is set by the janitor when it starts up.
// Workers may use this value when reporting metrics. The `Worker` struct provides
// a method for fetching this value, that caches it appropriately such that it's safe
// to call frequently, while still being up-to-date (even though it should "never" change)
pub const SHARD_ID_KEY: &str = "shard_id";

#[doc(hidden)]
pub mod test_support {
pub use crate::manager::Shard;
Expand Down
20 changes: 10 additions & 10 deletions rust/cyclotron-core/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use sqlx::PgPool;
Expand All @@ -24,28 +24,28 @@ pub struct Worker {
// All dequeued job IDs that haven't been flushed yet. The idea is this lets us
// manage, on the rust side of any API boundary, the "pending" update of any given
// job, such that a user can progressively build up a full update, and then flush it,
// rather than having to track the update state on their side and submit it all at once
// TODO - we don't handle people "forgetting" to abort a job, because we expect that to
// only happen if a process dies (in which case the job queue janitor should handle
// it)... this is a memory leak, but I think it's ok.
// TRICKY - this is a sync mutex, because we never hold it across an await point, and that
// radically simplifies using this for FFI (because there's no message passing across runtimes)
pending: Arc<Mutex<HashMap<Uuid, JobUpdate>>>,
// rather than having to track the update state on their side and submit it all at once.
// This also lets us "hide" all the locking logic, which we're not totally settled on yet.

// TRICKY - this is a sync mutex, because that simplifies using the manager in an FFI
// context (since most functions below can be sync). We have to be careful never to
// hold a lock across an await point, though.
pending: Mutex<HashMap<Uuid, JobUpdate>>,
}

impl Worker {
pub async fn new(config: PoolConfig) -> Result<Self, QueueError> {
let pool = config.connect().await?;
Ok(Self {
pool,
pending: Arc::new(Mutex::new(HashMap::new())),
pending: Default::default(),
})
}

pub fn from_pool(pool: PgPool) -> Self {
Self {
pool,
pending: Arc::new(Mutex::new(HashMap::new())),
pending: Default::default(),
}
}

Expand Down
1 change: 0 additions & 1 deletion rust/cyclotron-fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ uuid = { workspace = true }
envconfig = { workspace = true }
axum = { workspace = true }
thiserror = { workspace = true }
metrics = { workspace = true }
cyclotron-core = { path = "../cyclotron-core" }
common-metrics = { path = "../common/metrics" }
common-dns = { path = "../common/dns" }
Expand Down
5 changes: 5 additions & 0 deletions rust/cyclotron-fetch/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct Config {
#[envconfig(default = "default_worker_id")]
pub worker_id: String,

#[envconfig(default = "default")]
pub shard_id: String,

#[envconfig(default = "1")]
pub job_poll_interval_seconds: i64,

Expand Down Expand Up @@ -70,6 +73,7 @@ pub struct AppConfig {
pub host: String,
pub port: u16,
pub worker_id: String,
pub shard_id: String,
pub job_poll_interval: Duration, // How long we wait to poll for new jobs, when we're at capacity or find no new jobs
pub concurrent_requests_limit: u32,
pub fetch_timeout: Duration,
Expand All @@ -87,6 +91,7 @@ impl Config {
host: self.host,
port: self.port,
worker_id: self.worker_id,
shard_id: self.shard_id,
job_poll_interval: Duration::seconds(self.job_poll_interval_seconds),
concurrent_requests_limit: self.concurrent_requests_limit,
fetch_timeout: Duration::seconds(self.fetch_timeout_seconds),
Expand Down
19 changes: 17 additions & 2 deletions rust/cyclotron-fetch/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use cyclotron_core::{PoolConfig, Worker};
use cyclotron_core::{PoolConfig, Worker, SHARD_ID_KEY};
use health::HealthHandle;
use tokio::sync::Semaphore;

Expand All @@ -12,6 +12,7 @@ pub struct AppContext {
pub concurrency_limit: Arc<Semaphore>,
pub liveness: HealthHandle,
pub config: AppConfig,
pub metric_labels: RwLock<Vec<(String, String)>>,
}

impl AppContext {
Expand Down Expand Up @@ -44,12 +45,26 @@ impl AppContext {

let worker = Worker::new(pool_config).await?;

let labels = vec![
(SHARD_ID_KEY.to_string(), config.shard_id.clone()),
("worker_id".to_string(), config.worker_id.clone()),
("queue_served".to_string(), config.queue_served.clone()),
];

Ok(Self {
worker,
client,
concurrency_limit,
liveness,
config,
metric_labels: RwLock::new(labels),
})
}

// *Relatively* cheap, compared to the update above, but
// still, better to grab at the top of your fn and then
// reuse
pub fn metric_labels(&self) -> Vec<(String, String)> {
self.metric_labels.read().unwrap().clone()
}
}
Loading

0 comments on commit 88003f8

Please sign in to comment.