Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added record_metadata_fields_to_span macro to common crate #244

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions common/src/tracer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,44 @@ macro_rules! init_tracer {
tracer
}};
}

/// Should be called to populate the metadata fields of a given span
/// # Arguments
/// - metadata: supermusr_streaming_types::frame_metadata::FrameMetadata
/// - span: Span
///
/// # Prerequisites
/// The span should have been created with appropriate empty fields, either by
/// ```ignore
/// fields(
/// //...
/// metadata_timestamp = tracing::field::Empty,
/// metadata_frame_number = tracing::field::Empty,
/// metadata_period_number = tracing::field::Empty,
/// metadata_veto_flags = tracing::field::Empty,
/// metadata_protons_per_pulse = tracing::field::Empty,
/// metadata_running = tracing::field::Empty,
/// //...
/// )
/// ```
/// if using the #[instrument] macro over a function, or with
/// ```ignore
/// "metadata_timestamp" = tracing::field::Empty,
/// "metadata_frame_number" = tracing::field::Empty,
/// "metadata_period_number" = tracing::field::Empty,
/// "metadata_veto_flags" = tracing::field::Empty,
/// "metadata_protons_per_pulse" = tracing::field::Empty,
/// "metadata_running" = tracing::field::Empty,
/// ```
/// if creating the span directly using `info_span!()` or similar.
#[macro_export]
macro_rules! record_metadata_fields_to_span {
($metadata:expr, $span:expr) => {
$span.record("metadata_timestamp", $metadata.timestamp.to_rfc3339());
$span.record("metadata_frame_number", $metadata.frame_number);
$span.record("metadata_period_number", $metadata.period_number);
$span.record("metadata_veto_flags", $metadata.veto_flags);
$span.record("metadata_protons_per_pulse", $metadata.protons_per_pulse);
$span.record("metadata_running", $metadata.running);
};
}
14 changes: 13 additions & 1 deletion digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use supermusr_common::{
messages_received::{self, MessageKind},
metric_names::{FAILURES, FRAMES_SENT, MESSAGES_PROCESSED, MESSAGES_RECEIVED},
},
record_metadata_fields_to_span,
spanned::{FindSpanMut, Spanned},
tracer::{FutureRecordTracerExt, OptionalHeaderTracerExt, TracerEngine, TracerOptions},
CommonKafkaOpts, DigitizerId,
Expand Down Expand Up @@ -158,7 +159,15 @@ async fn main() -> anyhow::Result<()> {
}
}

#[tracing::instrument(skip_all, level = "trace")]
#[tracing::instrument(skip_all, fields(
digitiser_id = tracing::field::Empty,
metadata_timestamp = tracing::field::Empty,
metadata_frame_number = tracing::field::Empty,
metadata_period_number = tracing::field::Empty,
metadata_veto_flags = tracing::field::Empty,
metadata_protons_per_pulse = tracing::field::Empty,
metadata_running = tracing::field::Empty,
))]
async fn on_message(
use_otel: bool,
kafka_producer_thread_set: &mut JoinSet<()>,
Expand All @@ -180,6 +189,9 @@ async fn on_message(
let metadata_result: Result<FrameMetadata, _> = msg.metadata().try_into();
Modularius marked this conversation as resolved.
Show resolved Hide resolved
match metadata_result {
Ok(metadata) => {
tracing::Span::current().record("digitiser_id", msg.digitizer_id());
record_metadata_fields_to_span!(&metadata, tracing::Span::current());

debug!("Event packet: metadata: {:?}", msg.metadata());
cache.push(msg.digitizer_id(), metadata.clone(), msg.into());

Expand Down
19 changes: 19 additions & 0 deletions nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use supermusr_common::{
messages_received::{self, MessageKind},
metric_names::{FAILURES, MESSAGES_PROCESSED, MESSAGES_RECEIVED},
},
record_metadata_fields_to_span,
spanned::{Spanned, SpannedMut},
tracer::{OptionalHeaderTracerExt, TracerEngine, TracerOptions},
CommonKafkaOpts,
Expand All @@ -33,6 +34,7 @@ use supermusr_streaming_types::{
ecs_se00_data_generated::{
root_as_se_00_sample_environment_data, se_00_sample_environment_data_buffer_has_identifier,
},
FrameMetadata,
};
use tokio::time;
use tracing::{debug, error, info_span, level_filters::LevelFilter, trace_span, warn};
Expand Down Expand Up @@ -244,6 +246,16 @@ fn process_payload(nexus_engine: &mut NexusEngine, message_topic: &str, payload:
}
}

#[tracing::instrument(skip_all,
fields(
metadata_timestamp = tracing::field::Empty,
metadata_frame_number = tracing::field::Empty,
metadata_period_number = tracing::field::Empty,
metadata_veto_flags = tracing::field::Empty,
metadata_protons_per_pulse = tracing::field::Empty,
metadata_running = tracing::field::Empty,
)
)]
fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, payload: &[u8]) {
counter!(
MESSAGES_RECEIVED,
Expand All @@ -253,6 +265,13 @@ fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, pa
match root_as_frame_assembled_event_list_message(payload) {
Ok(data) => match nexus_engine.process_event_list(&data) {
Ok(run) => {
let metadata_result: Result<FrameMetadata, _> = data.metadata().try_into();
Modularius marked this conversation as resolved.
Show resolved Hide resolved
match metadata_result {
Ok(metadata) => {
record_metadata_fields_to_span!(&metadata, tracing::Span::current());
}
Err(e) => error!("{e}"),
};
if let Some(run) = run {
link_current_span_to_run_span!(run, "Frame Event List");
}
Expand Down