diff --git a/common/src/tracer/mod.rs b/common/src/tracer/mod.rs index fabf459a..c68e3a4c 100644 --- a/common/src/tracer/mod.rs +++ b/common/src/tracer/mod.rs @@ -59,13 +59,7 @@ macro_rules! init_tracer { #[macro_export] macro_rules! record_metadata_fields_to_span { ($metadata:expr, $span:expr) => { - $span.record( - "metadata_timestamp", - $metadata - .timestamp - .format(supermusr_common::TIMESTAMP_FORMAT) - .to_string(), - ); + $span.record("metadata_timestamp", $metadata.timestamp.to_rfc3339()); $span.record("metadata_frame_number", $metadata.frame_number); $span.record("metadata_period_number", $metadata.period_number); $span.record("metadata_veto_flags", $metadata.veto_flags); diff --git a/digitiser-aggregator/src/main.rs b/digitiser-aggregator/src/main.rs index 82c5d462..0097bfe9 100644 --- a/digitiser-aggregator/src/main.rs +++ b/digitiser-aggregator/src/main.rs @@ -20,6 +20,7 @@ use supermusr_common::{ messages_received::{self, MessageKind}, metric_names::{FAILURES, FRAMES_SENT, MESSAGES_PROCESSED, MESSAGES_RECEIVED}, }, + record_metadata_fields_to_span, spanned::{FindSpanMut, Spanned}, tracer::{FutureRecordTracerExt, OptionalHeaderTracerExt, TracerEngine, TracerOptions}, CommonKafkaOpts, DigitizerId, @@ -158,7 +159,15 @@ async fn main() -> anyhow::Result<()> { } } -#[tracing::instrument(skip_all, level = "trace")] +#[tracing::instrument(skip_all, fields( + digitiser_id = tracing::field::Empty, + metadata_timestamp = tracing::field::Empty, + metadata_frame_number = tracing::field::Empty, + metadata_period_number = tracing::field::Empty, + metadata_veto_flags = tracing::field::Empty, + metadata_protons_per_pulse = tracing::field::Empty, + metadata_running = tracing::field::Empty, +))] async fn on_message( use_otel: bool, kafka_producer_thread_set: &mut JoinSet<()>, @@ -180,6 +189,9 @@ async fn on_message( let metadata_result: Result = msg.metadata().try_into(); match metadata_result { Ok(metadata) => { + tracing::Span::current().record("digitiser_id", msg.digitizer_id()); + record_metadata_fields_to_span!(&metadata, tracing::Span::current()); + debug!("Event packet: metadata: {:?}", msg.metadata()); cache.push(msg.digitizer_id(), metadata.clone(), msg.into()); diff --git a/nexus-writer/src/main.rs b/nexus-writer/src/main.rs index 734a0c19..370fce2c 100644 --- a/nexus-writer/src/main.rs +++ b/nexus-writer/src/main.rs @@ -17,6 +17,7 @@ use supermusr_common::{ messages_received::{self, MessageKind}, metric_names::{FAILURES, MESSAGES_PROCESSED, MESSAGES_RECEIVED}, }, + record_metadata_fields_to_span, spanned::{Spanned, SpannedMut}, tracer::{OptionalHeaderTracerExt, TracerEngine, TracerOptions}, CommonKafkaOpts, @@ -33,6 +34,7 @@ use supermusr_streaming_types::{ ecs_se00_data_generated::{ root_as_se_00_sample_environment_data, se_00_sample_environment_data_buffer_has_identifier, }, + FrameMetadata, }; use tokio::time; use tracing::{debug, error, info_span, level_filters::LevelFilter, trace_span, warn}; @@ -244,6 +246,16 @@ fn process_payload(nexus_engine: &mut NexusEngine, message_topic: &str, payload: } } +#[tracing::instrument(skip_all, + fields( + metadata_timestamp = tracing::field::Empty, + metadata_frame_number = tracing::field::Empty, + metadata_period_number = tracing::field::Empty, + metadata_veto_flags = tracing::field::Empty, + metadata_protons_per_pulse = tracing::field::Empty, + metadata_running = tracing::field::Empty, + ) +)] fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, payload: &[u8]) { counter!( MESSAGES_RECEIVED, @@ -253,6 +265,13 @@ fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, pa match root_as_frame_assembled_event_list_message(payload) { Ok(data) => match nexus_engine.process_event_list(&data) { Ok(run) => { + let metadata_result: Result = data.metadata().try_into(); + match metadata_result { + Ok(metadata) => { + record_metadata_fields_to_span!(&metadata, tracing::Span::current()); + } + Err(e) => error!("{e}"), + }; if let Some(run) = run { link_current_span_to_run_span!(run, "Frame Event List"); }