diff --git a/console-subscriber/src/attribute_new.rs b/console-subscriber/src/attribute_new.rs new file mode 100644 index 000000000..189992ca5 --- /dev/null +++ b/console-subscriber/src/attribute_new.rs @@ -0,0 +1,196 @@ +use crate::ToProto; +use console_api as proto; +use proto::field::Value as UpdateValue; +use proto::{field::Name, MetaId}; +use std::collections::HashMap; +use std::ptr; +use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicU8, Ordering::*}; +use tracing::field::FieldSet; + +#[derive(Debug)] +pub(crate) struct Attributes { + attributes: HashMap, +} + +#[derive(Debug)] +pub(crate) struct Attribute { + name: proto::field::Name, + meta_id: MetaId, + value: Value, + unit: Option, +} + +#[derive(Debug)] +pub(crate) struct Value { + str_val: AtomicPtr, + other_val: AtomicU64, + val_type: AtomicU8, +} + +const EMPTY: u8 = 0; +const BOOL: u8 = 1; +const U64: u8 = 2; +const I64: u8 = 3; +const STR: u8 = 4; +const DEBUG: u8 = 5; + +#[derive(Debug, Clone)] +pub(crate) struct Update { + name: proto::field::Name, + is_delta: bool, + value: proto::field::Value, +} + +// // === impl Attributes === + +impl Attributes { + pub(crate) const STATE_PREFIX: &'static str = "state."; + + pub(crate) fn new(meta_id: MetaId, fields: &FieldSet) -> Self { + let attributes = fields + .iter() + .filter_map(|field| { + if field.name().starts_with(Attributes::STATE_PREFIX) { + let mut parts = field.name().split('.'); + parts.next(); + if let Some(name) = parts.next() { + return Some((name.into(), parts.next())); + } + } + None + }) + .map(|(name, unit): (Name, Option<&str>)| { + let value = Value { + str_val: AtomicPtr::new(ptr::null_mut()), + other_val: AtomicU64::new(0), + val_type: AtomicU8::new(0), + }; + let unit = unit.map(Into::into); + + let attr = Attribute { + name: name.clone(), + meta_id: meta_id.clone(), + unit, + value, + }; + (name, attr) + }) + .collect(); + + Self { attributes } + } + + pub(crate) fn update(&self, update: &Update) { + if let Some(attr) = self.attributes.get(&update.name) { + let is_delta = update.is_delta; + let perv_type = attr.value.val_type.swap(update.update_type(), AcqRel); + match (perv_type, &update.value) { + (BOOL | EMPTY, UpdateValue::BoolVal(upd)) => { + attr.value.other_val.store(*upd as u64, Release); + } + + (STR, UpdateValue::StrVal(upd)) => { + attr.value + .str_val + .store(Box::into_raw(Box::new(upd.clone())), Release); + } + + (DEBUG, UpdateValue::DebugVal(upd)) => { + attr.value + .str_val + .store(Box::into_raw(Box::new(upd.clone())), Release); + } + + (U64 | EMPTY, UpdateValue::U64Val(upd)) => { + if is_delta && perv_type != EMPTY { + attr.value.other_val.fetch_add(*upd, Release); + } else { + attr.value.other_val.store(*upd, Release); + } + } + (I64 | EMPTY, UpdateValue::I64Val(upd)) => { + if is_delta && perv_type != EMPTY { + attr.value + .other_val + .fetch_update(AcqRel, Acquire, |v| { + Some(((v as i64) + (*upd as i64)) as u64) + }) + .unwrap(); + } else { + attr.value.other_val.store(*upd as u64, Release); + } + } + (val, update) => { + tracing::warn!( + "attribute {:?} cannot be updated by update {:?}", + val, + update + ); + } + } + } + } + + pub(crate) fn values(&self) -> impl Iterator { + self.attributes.values() + } +} + +// // === impl Update === + +impl Update { + pub(crate) fn new( + name: proto::field::Name, + value: proto::field::Value, + is_delta: bool, + ) -> Self { + Self { + name, + is_delta, + value, + } + } + fn update_type(&self) -> u8 { + match self.value { + UpdateValue::BoolVal(_) => BOOL, + UpdateValue::StrVal(_) => STR, + UpdateValue::DebugVal(_) => DEBUG, + UpdateValue::U64Val(_) => U64, + UpdateValue::I64Val(_) => I64, + } + } +} + +impl ToProto for Attribute { + type Output = Option; + + fn to_proto(&self) -> Self::Output { + if let Some(value) = self.value.to_proto() { + return Some(proto::Attribute { + field: Some(proto::Field { + metadata_id: Some(self.meta_id.clone()), + name: Some(self.name.clone()), + value: Some(value), + }), + unit: self.unit.clone(), + }); + } + None + } +} + +impl ToProto for Value { + type Output = Option; + + fn to_proto(&self) -> Self::Output { + use proto::field::Value as ProtoVal; + match self.val_type.load(Acquire) { + BOOL => Some(ProtoVal::BoolVal(self.other_val.load(Acquire) != 0)), + U64 => Some(ProtoVal::U64Val(self.other_val.load(Acquire) as u64)), + I64 => Some(ProtoVal::I64Val(self.other_val.load(Acquire) as i64)), + DEBUG => Some(ProtoVal::StrVal("HAHA".to_string())), + STR => Some(ProtoVal::StrVal("HAHA".to_string())), + _ => None, + } + } +} diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 1145352b1..35c8be6db 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -27,6 +27,7 @@ use tracing_subscriber::{ mod aggregator; mod attribute; +mod attribute_new; mod builder; mod callsites; mod record; @@ -40,7 +41,10 @@ pub use builder::Builder; use callsites::Callsites; use record::Recorder; use stack::SpanStack; -use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor}; +use visitors::{ + AsyncOpVisitor, NewStateUpdateVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, + WakerVisitor, +}; pub use builder::{init, spawn}; @@ -479,6 +483,55 @@ impl ConsoleLayer { } } } + + fn record_updates( + &self, + span: SpanRef<'_, S>, + updates: Vec, + ctx: &Context<'_, S>, + ) where + S: Subscriber + for<'a> LookupSpan<'a>, + { + if self.is_resource(span.metadata()) { + self.state_update_new(span, updates, ctx, |exts| { + exts.get::>() + .map( as std::ops::Deref>::deref) + }) + } else if self.is_async_op(span.metadata()) { + self.state_update_new(span, updates, ctx, |exts| { + let async_op = exts.get::>()?; + Some(&async_op.stats) + }) + } + } + + fn state_update_new( + &self, + span: SpanRef<'_, S>, + updates: Vec, + ctx: &Context<'_, S>, + get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>, + ) where + S: Subscriber + for<'a> LookupSpan<'a>, + { + let exts = span.extensions(); + let stats = match get_stats(&exts) { + Some(stats) => stats, + None => return, + }; + + for upd in updates.iter() { + stats.update_attribute_new(upd); + if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) { + let exts = parent.extensions(); + if let Some(stats) = get_stats(&exts) { + if stats.inherit_child_attributes { + stats.update_attribute_new(upd); + } + } + } + } + } } impl Layer for ConsoleLayer @@ -570,10 +623,12 @@ where self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx)) }); if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || { + let meta_id = metadata.into(); let stats = Arc::new(stats::ResourceStats::new( at, inherit_child_attrs, parent_id.clone(), + attribute_new::Attributes::new(meta_id, attrs.fields()), )); let event = Event::Resource { id: id.clone(), @@ -587,7 +642,13 @@ where }; (event, stats) }) { - ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats); + let span = ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!"); + span.extensions_mut().insert(stats); + + // now record initial attrs + let mut attr_visitor = NewStateUpdateVisitor::default(); + attrs.record(&mut attr_visitor); + self.record_updates(span, attr_visitor.updates, &ctx) } } return; @@ -609,10 +670,12 @@ where if let Some(resource_id) = resource_id { if let Some(stats) = self.send_stats(&self.shared.dropped_async_ops, move || { + let meta_id = metadata.into(); let stats = Arc::new(stats::AsyncOpStats::new( at, inherit_child_attrs, parent_id.clone(), + attribute_new::Attributes::new(meta_id, attrs.fields()), )); let event = Event::AsyncResourceOp { id: id.clone(), @@ -625,7 +688,13 @@ where (event, stats) }) { - ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats); + let span = ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!"); + span.extensions_mut().insert(stats); + + // now record initial attrs + let mut attr_visitor = NewStateUpdateVisitor::default(); + attrs.record(&mut attr_visitor); + self.record_updates(span, attr_visitor.updates, &ctx) } } } @@ -842,6 +911,14 @@ where }); } } + + fn on_record(&self, id: &span::Id, values: &span::Record<'_>, cx: Context<'_, S>) { + if let Some(span) = cx.span(id) { + let mut attr_visitor = NewStateUpdateVisitor::default(); + values.record(&mut attr_visitor); + self.record_updates(span, attr_visitor.updates, &cx) + } + } } impl fmt::Debug for ConsoleLayer { diff --git a/console-subscriber/src/stats.rs b/console-subscriber/src/stats.rs index ad2b67f67..e4a07000d 100644 --- a/console-subscriber/src/stats.rs +++ b/console-subscriber/src/stats.rs @@ -1,4 +1,4 @@ -use crate::{attribute, sync::Mutex, ToProto}; +use crate::{attribute, attribute_new, sync::Mutex, ToProto}; use hdrhistogram::{ serialization::{Serializer, V2Serializer}, Histogram, @@ -87,6 +87,7 @@ pub(crate) struct ResourceStats { created_at: SystemTime, dropped_at: Mutex>, attributes: Mutex, + attributes_new: attribute_new::Attributes, pub(crate) inherit_child_attributes: bool, pub(crate) parent_id: Option, } @@ -263,10 +264,16 @@ impl AsyncOpStats { created_at: SystemTime, inherit_child_attributes: bool, parent_id: Option, + attributes_new: attribute_new::Attributes, ) -> Self { Self { task_id: AtomicU64::new(0), - stats: ResourceStats::new(created_at, inherit_child_attributes, parent_id), + stats: ResourceStats::new( + created_at, + inherit_child_attributes, + parent_id, + attributes_new, + ), poll_stats: PollStats::default(), } } @@ -327,13 +334,19 @@ impl ToProto for AsyncOpStats { type Output = proto::async_ops::Stats; fn to_proto(&self) -> Self::Output { - let attributes = self.stats.attributes.lock().values().cloned().collect(); + let attributes = self.stats.attributes.lock(); + let attributes_new = self + .stats + .attributes_new + .values() + .filter_map(ToProto::to_proto); + let attributes = attributes.values().cloned().chain(attributes_new); proto::async_ops::Stats { poll_stats: Some(self.poll_stats.to_proto()), created_at: Some(self.stats.created_at.into()), dropped_at: self.stats.dropped_at.lock().map(Into::into), task_id: self.task_id().map(Into::into), - attributes, + attributes: attributes.collect(), } } } @@ -345,6 +358,7 @@ impl ResourceStats { created_at: SystemTime, inherit_child_attributes: bool, parent_id: Option, + attributes_new: attribute_new::Attributes, ) -> Self { Self { is_dirty: AtomicBool::new(true), @@ -352,6 +366,7 @@ impl ResourceStats { created_at, dropped_at: Mutex::new(None), attributes: Default::default(), + attributes_new, inherit_child_attributes, parent_id, } @@ -362,6 +377,11 @@ impl ResourceStats { self.make_dirty(); } + pub(crate) fn update_attribute_new(&self, update: &attribute_new::Update) { + self.attributes_new.update(update); + self.make_dirty(); + } + #[inline] pub(crate) fn drop_resource(&self, dropped_at: SystemTime) { if self.is_dropped.swap(true, AcqRel) { @@ -412,11 +432,13 @@ impl ToProto for ResourceStats { type Output = proto::resources::Stats; fn to_proto(&self) -> Self::Output { - let attributes = self.attributes.lock().values().cloned().collect(); + let attributes = self.attributes.lock(); + let attributes_new = self.attributes_new.values().filter_map(ToProto::to_proto); + let attributes = attributes.values().cloned().chain(attributes_new); proto::resources::Stats { created_at: Some(self.created_at.into()), dropped_at: self.dropped_at.lock().map(Into::into), - attributes, + attributes: attributes.collect(), } } } diff --git a/console-subscriber/src/visitors.rs b/console-subscriber/src/visitors.rs index 20458e6b0..b84bfee95 100644 --- a/console-subscriber/src/visitors.rs +++ b/console-subscriber/src/visitors.rs @@ -2,7 +2,7 @@ //! fields from tracing metadata and producing the parts //! needed to construct `Event` instances. -use super::{attribute, WakeOp}; +use super::{attribute, attribute_new, WakeOp}; use console_api as proto; use proto::resources::resource; use tracing_core::{ @@ -533,3 +533,70 @@ impl Visit for StateUpdateVisitor { } } } + +/// Used to extract the fields needed to construct +/// state updates on resources and async ops. The fields +/// need to specified on the corresponding span and can have +/// one of two forms: +/// +/// state.field - expressed an overrifing update to the field +/// state.field.delta - expresses a change in the field's value (for numeric types) +/// +/// Optionally the field can have a unit suffix that will be displayed by the console +#[derive(Default)] +pub(crate) struct NewStateUpdateVisitor { + pub(crate) updates: Vec, +} + +impl NewStateUpdateVisitor { + const STATE_PREFIX: &'static str = "state."; + const DELTA: &'static str = "delta"; + + fn extract(&self, field: &field::Field) -> Option<(proto::field::Name, bool)> { + if field.name().starts_with(Self::STATE_PREFIX) { + let mut parts = field.name().split('.'); + parts.next(); + if let Some(name) = parts.next() { + return Some((name.into(), field.name().contains(Self::DELTA))); + } + } + None + } +} + +impl Visit for NewStateUpdateVisitor { + fn record_debug(&mut self, field: &field::Field, value: &dyn std::fmt::Debug) { + if let Some((name, is_delta)) = self.extract(field) { + self.updates + .push(attribute_new::Update::new(name, value.into(), is_delta)); + } + } + + fn record_i64(&mut self, field: &field::Field, value: i64) { + if let Some((name, is_delta)) = self.extract(field) { + self.updates + .push(attribute_new::Update::new(name, value.into(), is_delta)); + } + } + + fn record_u64(&mut self, field: &field::Field, value: u64) { + if let Some((name, is_delta)) = self.extract(field) { + self.updates + .push(attribute_new::Update::new(name, value.into(), is_delta)); + } + } + + fn record_bool(&mut self, field: &field::Field, value: bool) { + if let Some((name, is_delta)) = self.extract(field) { + self.updates + .push(attribute_new::Update::new(name, value.into(), is_delta)); + } + } + + fn record_str(&mut self, field: &field::Field, value: &str) { + if let Some((name, is_delta)) = self.extract(field) { + self.updates + .push(attribute_new::Update::new(name, value.into(), is_delta)); + } + } +}