Skip to content

Commit

Permalink
Implemented record_metadata_fields_to_span callers
Browse files Browse the repository at this point in the history
  • Loading branch information
Modularius committed Aug 29, 2024
1 parent 617ad87 commit 553ed98
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
8 changes: 1 addition & 7 deletions common/src/tracer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,7 @@ macro_rules! init_tracer {
#[macro_export]
macro_rules! record_metadata_fields_to_span {
($metadata:expr, $span:expr) => {
$span.record(
"metadata_timestamp",
$metadata
.timestamp
.format(supermusr_common::TIMESTAMP_FORMAT)
.to_string(),
);
$span.record("metadata_timestamp", $metadata.timestamp.to_rfc3339());
$span.record("metadata_frame_number", $metadata.frame_number);
$span.record("metadata_period_number", $metadata.period_number);
$span.record("metadata_veto_flags", $metadata.veto_flags);
Expand Down
14 changes: 13 additions & 1 deletion digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use supermusr_common::{
messages_received::{self, MessageKind},
metric_names::{FAILURES, FRAMES_SENT, MESSAGES_PROCESSED, MESSAGES_RECEIVED},
},
record_metadata_fields_to_span,
spanned::{FindSpanMut, Spanned},
tracer::{FutureRecordTracerExt, OptionalHeaderTracerExt, TracerEngine, TracerOptions},
CommonKafkaOpts, DigitizerId,
Expand Down Expand Up @@ -158,7 +159,15 @@ async fn main() -> anyhow::Result<()> {
}
}

#[tracing::instrument(skip_all, level = "trace")]
#[tracing::instrument(skip_all, fields(
digitiser_id = tracing::field::Empty,
metadata_timestamp = tracing::field::Empty,
metadata_frame_number = tracing::field::Empty,
metadata_period_number = tracing::field::Empty,
metadata_veto_flags = tracing::field::Empty,
metadata_protons_per_pulse = tracing::field::Empty,
metadata_running = tracing::field::Empty,
))]
async fn on_message(
use_otel: bool,
kafka_producer_thread_set: &mut JoinSet<()>,
Expand All @@ -180,6 +189,9 @@ async fn on_message(
let metadata_result: Result<FrameMetadata, _> = msg.metadata().try_into();
match metadata_result {
Ok(metadata) => {
tracing::Span::current().record("digitiser_id", msg.digitizer_id());
record_metadata_fields_to_span!(&metadata, tracing::Span::current());

debug!("Event packet: metadata: {:?}", msg.metadata());
cache.push(msg.digitizer_id(), metadata.clone(), msg.into());

Expand Down
19 changes: 19 additions & 0 deletions nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use supermusr_common::{
messages_received::{self, MessageKind},
metric_names::{FAILURES, MESSAGES_PROCESSED, MESSAGES_RECEIVED},
},
record_metadata_fields_to_span,
spanned::{Spanned, SpannedMut},
tracer::{OptionalHeaderTracerExt, TracerEngine, TracerOptions},
CommonKafkaOpts,
Expand All @@ -33,6 +34,7 @@ use supermusr_streaming_types::{
ecs_se00_data_generated::{
root_as_se_00_sample_environment_data, se_00_sample_environment_data_buffer_has_identifier,
},
FrameMetadata,
};
use tokio::time;
use tracing::{debug, error, info_span, level_filters::LevelFilter, trace_span, warn};
Expand Down Expand Up @@ -244,6 +246,16 @@ fn process_payload(nexus_engine: &mut NexusEngine, message_topic: &str, payload:
}
}

#[tracing::instrument(skip_all,
fields(
metadata_timestamp = tracing::field::Empty,
metadata_frame_number = tracing::field::Empty,
metadata_period_number = tracing::field::Empty,
metadata_veto_flags = tracing::field::Empty,
metadata_protons_per_pulse = tracing::field::Empty,
metadata_running = tracing::field::Empty,
)
)]
fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, payload: &[u8]) {
counter!(
MESSAGES_RECEIVED,
Expand All @@ -253,6 +265,13 @@ fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, pa
match root_as_frame_assembled_event_list_message(payload) {
Ok(data) => match nexus_engine.process_event_list(&data) {
Ok(run) => {
let metadata_result: Result<FrameMetadata, _> = data.metadata().try_into();
match metadata_result {
Ok(metadata) => {
record_metadata_fields_to_span!(&metadata, tracing::Span::current());
}
Err(e) => error!("{e}"),
};
if let Some(run) = run {
link_current_span_to_run_span!(run, "Frame Event List");
}
Expand Down

0 comments on commit 553ed98

Please sign in to comment.