Skip to content

Commit

Permalink
Add support for tracing::Span::recorded fields in `metrics-tracing-…
Browse files Browse the repository at this point in the history
…context` (metrics-rs#408)
  • Loading branch information
zohnannor authored Nov 29, 2023
1 parent 9aab03c commit c37a407
Show file tree
Hide file tree
Showing 6 changed files with 571 additions and 150 deletions.
4 changes: 4 additions & 0 deletions metrics-tracing-context/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased] - ReleaseDate

### Added

- Support for dynamism using `tracing::Span::record` to add label values. ([#408](https://github.com/metrics-rs/metrics/pull/408))

## [0.14.0] - 2023-04-16

### Changed
Expand Down
2 changes: 2 additions & 0 deletions metrics-tracing-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ itoa = { version = "1", default-features = false }
metrics = { version = "^0.21", path = "../metrics" }
metrics-util = { version = "^0.15", path = "../metrics-util" }
lockfree-object-pool = { version = "0.1.3", default-features = false }
indexmap = { version = "2.1", default-features = false, features = ["std"] }
once_cell = { version = "1", default-features = false, features = ["std"] }
tracing = { version = "0.1.29", default-features = false }
tracing-core = { version = "0.1.21", default-features = false }
Expand All @@ -42,3 +43,4 @@ criterion = { version = "=0.3.3", default-features = false }
parking_lot = { version = "0.12.1", default-features = false }
tracing = { version = "0.1.29", default-features = false, features = ["std"] }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["registry"] }
itertools = { version = "0.12.0", default-features = false, features = ["use_std"] }
11 changes: 7 additions & 4 deletions metrics-tracing-context/benches/visit.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use indexmap::IndexMap;
use lockfree_object_pool::LinearObjectPool;
use metrics::Label;
use metrics::SharedString;
use metrics_tracing_context::Labels;
use once_cell::sync::OnceCell;
use tracing::Metadata;
Expand All @@ -13,9 +14,11 @@ use tracing_core::{
Callsite, Interest,
};

fn get_pool() -> &'static Arc<LinearObjectPool<Vec<Label>>> {
static POOL: OnceCell<Arc<LinearObjectPool<Vec<Label>>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(|| Vec::new(), |vec| vec.clear())))
type Map = IndexMap<SharedString, SharedString>;

fn get_pool() -> &'static Arc<LinearObjectPool<Map>> {
static POOL: OnceCell<Arc<LinearObjectPool<Map>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Map::new, Map::clear)))
}

const BATCH_SIZE: usize = 1000;
Expand Down
89 changes: 47 additions & 42 deletions metrics-tracing-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,25 @@
//!
//! # Implementation
//!
//! The integration layer works by capturing all fields present when a span is created and storing
//! them as an extension to the span. If a metric is emitted while a span is entered, we check that
//! span to see if it has any fields in the extension data, and if it does, we add those fields as
//! labels to the metric key.
//!
//! There are two important behaviors to be aware of:
//! - we only capture the fields present when the span is created
//! - we store all fields that a span has, including the fields of its parent span(s)
//!
//! ## Lack of dynamism
//!
//! This means that if you use [`Span::record`][tracing::Span::record] to add fields to a span after
//! it has been created, those fields will not be captured and added to your metric key.
//! The integration layer works by capturing all fields that are present when a span is created,
//! as well as fields recorded after the fact, and storing them as an extension to the span. If
//! a metric is emitted while a span is entered, any fields captured for that span will be added
//! to the metric as additional labels.
//!
//! Be aware that we recursively capture the fields of a span, including fields from
//! parent spans, and use them when generating metric labels. This means that if a metric is being
//! emitted in span B, which is a child of span A, and span A has field X, and span B has field Y,
//! then the metric labels will include both field X and Y. This applies regardless of how many
//! nested spans are currently entered.
//!
//! ## Duplicate span fields
//!
//! When span fields are captured, they are deduplicated such that only the most recent value is kept.
//! For merging parent span fields into the current span fields, the fields from the current span have
//! the highest priority. Additionally, when using [`Span::record`][tracing::Span::record] to add fields
//! to a span after it has been created, the same behavior applies. This means that recording a field
//! multiple times only keeps the most recently recorded value, including if a field was already present
//! from a parent span and is then recorded dynamically in the current span.
//!
//! ## Span fields and ancestry
//!
Expand Down Expand Up @@ -104,7 +110,7 @@ pub mod label_filter;
mod tracing_integration;

pub use label_filter::LabelFilter;
use tracing_integration::WithContext;
use tracing_integration::Map;
pub use tracing_integration::{Labels, MetricsLayer};

/// [`TracingContextLayer`] provides an implementation of a [`Layer`] for [`TracingContext`].
Expand Down Expand Up @@ -169,34 +175,33 @@ where
// doing the iteration would likely exceed the cost of simply constructing the new key.
tracing::dispatcher::get_default(|dispatch| {
let current = dispatch.current_span();
if let Some(id) = current.id() {
// We're currently within a live tracing span, so see if we have an available
// metrics context to grab any fields/labels out of.
if let Some(ctx) = dispatch.downcast_ref::<WithContext>() {
let mut f = |new_labels: &[Label]| {
if !new_labels.is_empty() {
let (name, mut labels) = key.clone().into_parts();

let filtered_labels = new_labels
.iter()
.filter(|label| {
self.label_filter.should_include_label(&name, label)
})
.cloned();
labels.extend(filtered_labels);

Some(Key::from_parts(name, labels))
} else {
None
}
};

// Pull in the span's fields/labels if they exist.
return ctx.with_labels(dispatch, id, &mut f);
}
}

None
let id = current.id()?;
let ctx = dispatch.downcast_ref::<MetricsLayer>()?;

let mut f = |mut span_labels: Map| {
(!span_labels.is_empty()).then(|| {
let (name, labels) = key.clone().into_parts();

// Filter only span labels
span_labels.retain(|key: &SharedString, value: &mut SharedString| {
let label = Label::new(key.clone(), value.clone());
self.label_filter.should_include_label(&name, &label)
});

// Overwrites labels from spans
span_labels.extend(labels.into_iter().map(Label::into_parts));

let labels = span_labels
.into_iter()
.map(|(key, value)| Label::new(key, value))
.collect::<Vec<_>>();

Key::from_parts(name, labels)
})
};

// Pull in the span's fields/labels if they exist.
ctx.with_labels(dispatch, id, &mut f)
})
}
}
Expand Down
146 changes: 70 additions & 76 deletions metrics-tracing-context/src/tracing_integration.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,49 @@
//! The code that integrates with the `tracing` crate.

use indexmap::IndexMap;
use lockfree_object_pool::{LinearObjectPool, LinearOwnedReusable};
use metrics::{Key, Label};
use metrics::{Key, SharedString};
use once_cell::sync::OnceCell;
use std::cmp;
use std::sync::Arc;
use std::{any::TypeId, marker::PhantomData};
use tracing_core::span::{Attributes, Id, Record};
use tracing_core::{field::Visit, Dispatch, Field, Subscriber};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};

fn get_pool() -> &'static Arc<LinearObjectPool<Vec<Label>>> {
static POOL: OnceCell<Arc<LinearObjectPool<Vec<Label>>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Vec::new, Vec::clear)))
pub(crate) type Map = IndexMap<SharedString, SharedString>;

fn get_pool() -> &'static Arc<LinearObjectPool<Map>> {
static POOL: OnceCell<Arc<LinearObjectPool<Map>>> = OnceCell::new();
POOL.get_or_init(|| Arc::new(LinearObjectPool::new(Map::new, Map::clear)))
}

/// Span fields mapped as metrics labels.
///
/// Hidden from documentation as there is no need for end users to ever touch this type, but it must
/// be public in order to be pulled in by external benchmark code.
#[doc(hidden)]
pub struct Labels(pub LinearOwnedReusable<Vec<Label>>);
pub struct Labels(pub LinearOwnedReusable<Map>);

impl Labels {
pub(crate) fn extend_from_labels(&mut self, other: &Labels) {
self.0.extend_from_slice(other.as_ref());
fn extend(&mut self, other: &Labels, f: impl Fn(&mut Map, &SharedString, &SharedString)) {
let new_len = cmp::max(self.as_ref().len(), other.as_ref().len());
let additional = new_len - self.as_ref().len();
self.0.reserve(additional);
for (k, v) in other.as_ref() {
f(&mut self.0, k, v);
}
}

fn extend_from_labels(&mut self, other: &Labels) {
self.extend(other, |map, k, v| {
map.entry(k.clone()).or_insert_with(|| v.clone());
});
}

fn extend_from_labels_overwrite(&mut self, other: &Labels) {
self.extend(other, |map, k, v| {
map.insert(k.clone(), v.clone());
});
}
}

Expand All @@ -34,108 +55,86 @@ impl Default for Labels {

impl Visit for Labels {
fn record_str(&mut self, field: &Field, value: &str) {
let label = Label::new(field.name(), value.to_string());
self.0.push(label);
self.0.insert(field.name().into(), value.to_owned().into());
}

fn record_bool(&mut self, field: &Field, value: bool) {
let label = Label::from_static_parts(field.name(), if value { "true" } else { "false" });
self.0.push(label);
self.0.insert(field.name().into(), if value { "true" } else { "false" }.into());
}

fn record_i64(&mut self, field: &Field, value: i64) {
let mut buf = itoa::Buffer::new();
let s = buf.format(value);
let label = Label::new(field.name(), s.to_string());
self.0.push(label);
self.0.insert(field.name().into(), s.to_owned().into());
}

fn record_u64(&mut self, field: &Field, value: u64) {
let mut buf = itoa::Buffer::new();
let s = buf.format(value);
let label = Label::new(field.name(), s.to_string());
self.0.push(label);
self.0.insert(field.name().into(), s.to_owned().into());
}

fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
let value_string = format!("{:?}", value);
let label = Label::new(field.name(), value_string);
self.0.push(label);
self.0.insert(field.name().into(), format!("{value:?}").into());
}
}

impl Labels {
fn from_attributes(attrs: &Attributes<'_>) -> Labels {
fn from_record(record: &Record) -> Labels {
let mut labels = Labels::default();
let record = Record::new(attrs.values());
record.record(&mut labels);
labels
}
}

impl AsRef<[Label]> for Labels {
fn as_ref(&self) -> &[Label] {
impl AsRef<Map> for Labels {
fn as_ref(&self) -> &Map {
&self.0
}
}

pub struct WithContext {
with_labels: fn(&Dispatch, &Id, f: &mut dyn FnMut(&Labels) -> Option<Key>) -> Option<Key>,
}

impl WithContext {
pub fn with_labels(
&self,
dispatch: &Dispatch,
id: &Id,
f: &mut dyn FnMut(&[Label]) -> Option<Key>,
) -> Option<Key> {
let mut ff = |labels: &Labels| f(labels.as_ref());
(self.with_labels)(dispatch, id, &mut ff)
}
}

/// [`MetricsLayer`] is a [`tracing_subscriber::Layer`] that captures the span
/// fields and allows them to be later on used as metrics labels.
pub struct MetricsLayer<S> {
ctx: WithContext,
_subscriber: PhantomData<fn(S)>,
#[derive(Default)]
pub struct MetricsLayer {
with_labels:
Option<fn(&Dispatch, &Id, f: &mut dyn FnMut(&Labels) -> Option<Key>) -> Option<Key>>,
}

impl<S> MetricsLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
/// Create a new `MetricsLayer`.
impl MetricsLayer {
/// Create a new [`MetricsLayer`].
pub fn new() -> Self {
let ctx = WithContext { with_labels: Self::with_labels };

Self { ctx, _subscriber: PhantomData }
Self::default()
}

fn with_labels(
pub(crate) fn with_labels(
&self,
dispatch: &Dispatch,
id: &Id,
f: &mut dyn FnMut(&Labels) -> Option<Key>,
f: &mut dyn FnMut(Map) -> Option<Key>,
) -> Option<Key> {
let subscriber = dispatch
.downcast_ref::<S>()
.expect("subscriber should downcast to expected type; this is a bug!");
let span = subscriber.span(id).expect("registry should have a span for the current ID");

let result =
if let Some(labels) = span.extensions().get::<Labels>() { f(labels) } else { None };
result
let mut ff = |labels: &Labels| f(labels.0.clone());
(self.with_labels?)(dispatch, id, &mut ff)
}
}

impl<S> Layer<S> for MetricsLayer<S>
impl<S> Layer<S> for MetricsLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_layer(&mut self, _: &mut S) {
self.with_labels = Some(|dispatch, id, f| {
let subscriber = dispatch.downcast_ref::<S>()?;
let span = subscriber.span(id)?;

let ext = span.extensions();
f(ext.get::<Labels>()?)
});
}

fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, cx: Context<'_, S>) {
let span = cx.span(id).expect("span must already exist!");
let mut labels = Labels::from_attributes(attrs);
let mut labels = Labels::from_record(&Record::new(attrs.values()));

if let Some(parent) = span.parent() {
if let Some(parent_labels) = parent.extensions().get::<Labels>() {
Expand All @@ -146,20 +145,15 @@ where
span.extensions_mut().insert(labels);
}

unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
match id {
id if id == TypeId::of::<Self>() => Some(self as *const _ as *const ()),
id if id == TypeId::of::<WithContext>() => Some(&self.ctx as *const _ as *const ()),
_ => None,
}
}
}
fn on_record(&self, id: &Id, values: &Record<'_>, cx: Context<'_, S>) {
let span = cx.span(id).expect("span must already exist!");
let labels = Labels::from_record(values);

impl<S> Default for MetricsLayer<S>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn default() -> Self {
MetricsLayer::new()
let ext = &mut span.extensions_mut();
if let Some(existing) = ext.get_mut::<Labels>() {
existing.extend_from_labels_overwrite(&labels);
} else {
ext.insert(labels);
}
}
}
Loading

0 comments on commit c37a407

Please sign in to comment.