Skip to content

Commit

Permalink
Merge pull request #101 from dschatzberg/openmetrics
Browse files Browse the repository at this point in the history
scx_layered: Add support for OpenMetrics format
  • Loading branch information
htejun authored Jan 25, 2024
2 parents 7117a22 + 7f9548e commit eb997a6
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 41 deletions.
1 change: 1 addition & 0 deletions scheds/rust/scx_layered/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ lazy_static = "1.4"
libbpf-rs = "0.22"
libc = "0.2"
log = "0.4"
prometheus-client = "0.22.0"
scx_utils = { path = "../../../rust/scx_utils", version = "0.5" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
261 changes: 220 additions & 41 deletions scheds/rust/scx_layered/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::io::Read;
use std::io::Write;
use std::ops::Sub;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -33,6 +35,10 @@ use libbpf_rs::skel::SkelBuilder as _;
use log::debug;
use log::info;
use log::trace;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use scx_utils::ravg::ravg_read;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -252,6 +258,12 @@ struct Opts {
#[clap(short = 'v', long, action = clap::ArgAction::Count)]
verbose: u8,

/// Enable output of stats in OpenMetrics format instead of via log macros.
/// This option is useful if you want to collect stats in some monitoring
/// database like prometheseus.
#[clap(short = 'o', long)]
open_metrics_format: bool,

/// Write example layer specifications into the file and exit.
#[clap(short = 'e', long)]
example: Option<String>,
Expand Down Expand Up @@ -1099,6 +1111,102 @@ impl Layer {
}
}

#[derive(Default)]
struct OpenMetricsStats {
registry: Registry,
total: Gauge<i64, AtomicI64>,
local: Gauge<f64, AtomicU64>,
open_idle: Gauge<f64, AtomicU64>,
affn_viol: Gauge<f64, AtomicU64>,
tctx_err: Gauge<i64, AtomicI64>,
proc_ms: Gauge<i64, AtomicI64>,
busy: Gauge<f64, AtomicU64>,
util: Gauge<f64, AtomicU64>,
load: Gauge<f64, AtomicU64>,
l_util: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
l_util_frac: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
l_load: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
l_load_frac: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
l_tasks: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
l_total: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
l_local: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
l_open_idle: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
l_preempt: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
l_affn_viol: Family<Vec<(String, String)>, Gauge<f64, AtomicU64>>,
l_cur_nr_cpus: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
l_min_nr_cpus: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
l_max_nr_cpus: Family<Vec<(String, String)>, Gauge<i64, AtomicI64>>,
}

impl OpenMetricsStats {
fn new() -> OpenMetricsStats {
let mut metrics = OpenMetricsStats {
registry: <Registry>::default(),
..Default::default()
};
// Helper macro to reduce on some of the boilerplate:
// $i: The identifier of the metric to register
// $help: The Help text associated with the metric
macro_rules! register {
($i:ident, $help:expr) => {
metrics
.registry
.register(stringify!($i), $help, metrics.$i.clone())
};
}
register!(total, "Total scheduling events in the period");
register!(local, "% that got scheduled directly into an idle CPU");
register!(
open_idle,
"% of open layer tasks scheduled into occupied idle CPUs"
);
register!(
affn_viol,
"% which violated configured policies due to CPU affinity restrictions"
);
register!(tctx_err, "Failures to free task contexts");
register!(
proc_ms,
"CPU time this binary has consumed during the period"
);
register!(busy, "CPU busy % (100% means all CPUs were fully occupied)");
register!(
util,
"CPU utilization % (100% means one CPU was fully occupied)"
);
register!(load, "Sum of weight * duty_cycle for all tasks");
register!(
l_util,
"CPU utilization of the layer (100% means one CPU was fully occupied)"
);
register!(
l_util_frac,
"Fraction of total CPU utilization consumed by the layer"
);
register!(l_load, "Sum of weight * duty_cycle for tasks in the layer");
register!(l_load_frac, "Fraction of total load consumed by the layer");
register!(l_tasks, "Number of tasks in the layer");
register!(l_total, "Number of scheduling events in the layer");
register!(l_local, "% of scheduling events directly into an idle CPU");
register!(
l_open_idle,
"% of scheduling events into idle CPUs occupied by other layers"
);
register!(
l_preempt,
"% of scheduling events that preempted other tasks"
);
register!(
l_affn_viol,
"% of scheduling events that violated configured policies due to CPU affinity restrictions"
);
register!(l_cur_nr_cpus, "Current # of CPUs assigned to the layer");
register!(l_min_nr_cpus, "Minimum # of CPUs assigned to the layer");
register!(l_max_nr_cpus, "Maximum # of CPUs assigned to the layer");
metrics
}
}

struct Scheduler<'a> {
skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>,
Expand All @@ -1118,6 +1226,9 @@ struct Scheduler<'a> {
nr_layer_cpus_min_max: Vec<(usize, usize)>,
processing_dur: Duration,
prev_processing_dur: Duration,

om_stats: OpenMetricsStats,
om_format: bool,
}

impl<'a> Scheduler<'a> {
Expand Down Expand Up @@ -1224,6 +1335,9 @@ impl<'a> Scheduler<'a> {

proc_reader,
skel,

om_stats: OpenMetricsStats::new(),
om_format: opts.open_metrics_format,
})
}

Expand Down Expand Up @@ -1333,24 +1447,44 @@ impl<'a> Scheduler<'a> {
}
};

info!(
"tot={:7} local={:5.2} open_idle={:5.2} affn_viol={:5.2} tctx_err={} proc={:?}ms",
total,
lsum_pct(bpf_intf::layer_stat_idx_LSTAT_LOCAL),
lsum_pct(bpf_intf::layer_stat_idx_LSTAT_OPEN_IDLE),
lsum_pct(bpf_intf::layer_stat_idx_LSTAT_AFFN_VIOL),
self.om_stats.total.set(total as i64);
self.om_stats
.local
.set(lsum_pct(bpf_intf::layer_stat_idx_LSTAT_LOCAL));
self.om_stats
.open_idle
.set(lsum_pct(bpf_intf::layer_stat_idx_LSTAT_OPEN_IDLE));
self.om_stats
.affn_viol
.set(lsum_pct(bpf_intf::layer_stat_idx_LSTAT_AFFN_VIOL));
self.om_stats.tctx_err.set(
stats.prev_bpf_stats.gstats
[bpf_intf::global_stat_idx_GSTAT_TASK_CTX_FREE_FAILED as usize],
processing_dur.as_millis(),
[bpf_intf::global_stat_idx_GSTAT_TASK_CTX_FREE_FAILED as usize] as i64,
);
self.om_stats.proc_ms.set(processing_dur.as_millis() as i64);
self.om_stats.busy.set(stats.cpu_busy * 100.0);
self.om_stats.util.set(stats.total_util * 100.0);
self.om_stats.load.set(stats.total_load);

info!(
"busy={:5.1} util={:7.1} load={:9.1} fallback_cpu={:3}",
stats.cpu_busy * 100.0,
stats.total_util * 100.0,
stats.total_load,
self.cpu_pool.fallback_cpu,
);
if !self.om_format {
info!(
"tot={:7} local={:5.2} open_idle={:5.2} affn_viol={:5.2} tctx_err={} proc={:?}ms",
self.om_stats.total.get(),
self.om_stats.local.get(),
self.om_stats.open_idle.get(),
self.om_stats.affn_viol.get(),
self.om_stats.tctx_err.get(),
self.om_stats.proc_ms.get(),
);

info!(
"busy={:5.1} util={:7.1} load={:9.1} fallback_cpu={:3}",
self.om_stats.busy.get(),
self.om_stats.util.get(),
self.om_stats.load.get(),
self.cpu_pool.fallback_cpu,
);
}

let header_width = self
.layer_specs
Expand All @@ -1376,38 +1510,83 @@ impl<'a> Scheduler<'a> {
}
};

info!(
" {:<width$}: util/frac={:7.1}/{:5.1} load/frac={:9.1}:{:5.1} tasks={:6}",
spec.name,
stats.layer_utils[lidx] * 100.0,
calc_frac(stats.layer_utils[lidx], stats.total_util),
stats.layer_loads[lidx],
calc_frac(stats.layer_loads[lidx], stats.total_load),
stats.nr_layer_tasks[lidx],
width = header_width,
// Helper macros to reduce some boilerplate.
// $i: The identifier of the metric to set
// $e: The expression to set the metric to
macro_rules! set {
($i: ident, $e:expr) => {{
let l = self
.om_stats
.$i
.get_or_create(&vec![("layer_name".to_owned(), spec.name.clone())]);
l.set($e);
l
}};
}
let l_util = set!(l_util, stats.layer_utils[lidx] * 100.0);
let l_util_frac = set!(
l_util_frac,
calc_frac(stats.layer_utils[lidx], stats.total_util)
);
info!(
" {:<width$} tot={:7} local={:5.2} open_idle={:5.2} preempt={:5.2} affn_viol={:5.2}",
"",
ltotal,
lstat_pct(bpf_intf::layer_stat_idx_LSTAT_LOCAL),
lstat_pct(bpf_intf::layer_stat_idx_LSTAT_OPEN_IDLE),
lstat_pct(bpf_intf::layer_stat_idx_LSTAT_PREEMPT),
lstat_pct(bpf_intf::layer_stat_idx_LSTAT_AFFN_VIOL),
width = header_width,
let l_load = set!(l_load, stats.layer_loads[lidx]);
let l_load_frac = set!(
l_load_frac,
calc_frac(stats.layer_loads[lidx], stats.total_load)
);
info!(
" {:<width$} cpus={:3} [{:3},{:3}] {}",
"",
layer.nr_cpus,
self.nr_layer_cpus_min_max[lidx].0,
self.nr_layer_cpus_min_max[lidx].1,
format_bitvec(&layer.cpus),
width = header_width
let l_tasks = set!(l_tasks, stats.nr_layer_tasks[lidx] as i64);
let l_total = set!(l_total, ltotal as i64);
let l_local = set!(l_local, lstat_pct(bpf_intf::layer_stat_idx_LSTAT_LOCAL));
let l_open_idle = set!(
l_open_idle,
lstat_pct(bpf_intf::layer_stat_idx_LSTAT_OPEN_IDLE)
);
let l_preempt = set!(l_preempt, lstat_pct(bpf_intf::layer_stat_idx_LSTAT_PREEMPT));
let l_affn_viol = set!(
l_affn_viol,
lstat_pct(bpf_intf::layer_stat_idx_LSTAT_AFFN_VIOL)
);
let l_cur_nr_cpus = set!(l_cur_nr_cpus, layer.nr_cpus as i64);
let l_min_nr_cpus = set!(l_min_nr_cpus, self.nr_layer_cpus_min_max[lidx].0 as i64);
let l_max_nr_cpus = set!(l_max_nr_cpus, self.nr_layer_cpus_min_max[lidx].1 as i64);
if !self.om_format {
info!(
" {:<width$}: util/frac={:7.1}/{:5.1} load/frac={:9.1}:{:5.1} tasks={:6}",
spec.name,
l_util.get(),
l_util_frac.get(),
l_load.get(),
l_load_frac.get(),
l_tasks.get(),
width = header_width,
);
info!(
" {:<width$} tot={:7} local={:5.2} open_idle={:5.2} preempt={:5.2} affn_viol={:5.2}",
"",
l_total.get(),
l_local.get(),
l_open_idle.get(),
l_preempt.get(),
l_affn_viol.get(),
width = header_width,
);
info!(
" {:<width$} cpus={:3} [{:3},{:3}] {}",
"",
l_cur_nr_cpus.get(),
l_min_nr_cpus.get(),
l_max_nr_cpus.get(),
format_bitvec(&layer.cpus),
width = header_width
);
}
self.nr_layer_cpus_min_max[lidx] = (layer.nr_cpus, layer.nr_cpus);
}

if self.om_format {
let mut buffer = String::new();
encode(&mut buffer, &self.om_stats.registry).unwrap();
print!("{}", buffer);
}
self.processing_dur += Instant::now().duration_since(started_at);
Ok(())
}
Expand Down

0 comments on commit eb997a6

Please sign in to comment.