Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing Standards and Refactoring to Comply #235

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
4ac7194
Added frames/expected pulses
Modularius Jun 25, 2024
2c39c8a
Changed FrameCache to use references for metadata
Modularius Jun 27, 2024
b4a1217
Added Frame details to span
Modularius Jun 27, 2024
327ffbf
Added fields to simulator + rustfmt
Modularius Jun 27, 2024
a82c5bb
Added fields to trace-to-events
Modularius Jun 27, 2024
77dbdea
Reduced overhead of spans
Modularius Jun 28, 2024
41ce260
Fixed Spans
Modularius Jun 28, 2024
2b97e50
Added template creation spans
Modularius Jun 28, 2024
d833952
Fixed Spans
Modularius Jun 28, 2024
c1a3a6f
Added fields and treefmt
Modularius Jun 30, 2024
5da7737
Added doc
Modularius Jun 30, 2024
006e88f
Added fields to spans
Modularius Jul 3, 2024
abf6450
Refactoring
Modularius Jul 3, 2024
c8869cf
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Jul 19, 2024
29afc0e
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Jul 24, 2024
51eeae6
Removed unused deps
Modularius Jul 24, 2024
ebf6684
Modified tracing doc
Modularius Jul 24, 2024
de03c82
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Jul 26, 2024
289a3c1
Modifies Spans
Modularius Aug 2, 2024
6967526
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Aug 2, 2024
60332b2
Refactoring
Modularius Aug 5, 2024
a3e9661
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Aug 5, 2024
d4a9e23
Added more simulator functionality
Modularius Aug 11, 2024
6fb7100
Added Max-Frame option for testing
Modularius Aug 12, 2024
73e49be
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Aug 12, 2024
782bd19
Added Metadata Fields throughout
Modularius Aug 14, 2024
0802904
Moved TIMESTAMP_FORMAT constant to common crate
Modularius Aug 14, 2024
15ede33
Added SpannedInit Trait and record_metadata_fields_to_span macro
Modularius Aug 14, 2024
ec93aee
Worked on tracing.md
Modularius Aug 14, 2024
71ba703
Update to tracing.md
Modularius Aug 15, 2024
0a03b2a
Deleted sequence diagrams
Modularius Aug 15, 2024
4a563af
Changed SpannedInit to SpannedAggregator and added methods
Modularius Aug 15, 2024
d7d92c0
Modified Readme and FrameMetadata
Modularius Aug 15, 2024
8281681
Improved tracing.md
Modularius Aug 15, 2024
f781beb
Fixed tests
Modularius Aug 16, 2024
0121d66
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Aug 16, 2024
fd9a608
Formatting
Modularius Aug 16, 2024
8688d34
Remove commented code and improve error messages
Modularius Aug 16, 2024
2f788ce
Removed FindSpan and FindSpanMut traits
Modularius Aug 16, 2024
7eaae59
Treefmt
Modularius Aug 16, 2024
02cc0de
Simplified metadata macro
Modularius Aug 16, 2024
7b3a706
Changed to IntConstants and new tracing.md paragraph
Modularius Aug 16, 2024
ece1d4d
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Sep 2, 2024
0a60304
Merge remote-tracking branch 'supermusr-data-pipeline/main' into Add-…
Modularius Sep 3, 2024
9e5b513
Event handling for span linking
Modularius Sep 3, 2024
abb5f6e
Tidying
Modularius Sep 3, 2024
6412cd3
Formatting
Modularius Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 104 additions & 45 deletions digitiser-aggregator/src/frame/cache.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use crate::data::{Accumulate, DigitiserData};
use std::{collections::VecDeque, fmt::Debug, time::Duration};
use supermusr_common::{
spanned::{FindSpan, FindSpanMut, SpannedAggregator},
DigitizerId,
use crate::{
data::{Accumulate, DigitiserData},
TIMESTAMP_FORMAT,
};
use std::{collections::HashMap, fmt::Debug, time::Duration};
use supermusr_common::{spanned::SpannedAggregator, DigitizerId};
use supermusr_streaming_types::FrameMetadata;
use tracing::warn;

use super::{partial::PartialFrame, AggregatedFrame};

pub(crate) struct FrameCache<D: Debug> {
ttl: Duration,
expected_digitisers: Vec<DigitizerId>,

frames: VecDeque<PartialFrame<D>>,
frames: HashMap<FrameMetadata, PartialFrame<D>>,
}

impl<D: Debug> FrameCache<D>
Expand All @@ -27,52 +28,65 @@ where
}
}

pub(crate) fn push(&mut self, digitiser_id: DigitizerId, metadata: &FrameMetadata, data: D) {
match self
#[tracing::instrument(skip_all, fields(
digitiser_id = digitiser_id,
metadata_timestamp = metadata.timestamp.format(TIMESTAMP_FORMAT).to_string(),
metadata_frame_number = metadata.frame_number,
metadata_period_number = metadata.period_number,
metadata_veto_flags = metadata.veto_flags,
metadata_protons_per_pulse = metadata.protons_per_pulse,
metadata_running = metadata.running
))]
pub(crate) fn push<'a>(
&'a mut self,
digitiser_id: DigitizerId,
metadata: &FrameMetadata,
data: D,
) -> &'a impl SpannedAggregator {
let frame = self
.frames
.iter_mut()
.find(|frame| frame.metadata.equals_ignoring_veto_flags(metadata))
{
Some(frame) => {
frame.push(digitiser_id, data);
frame.push_veto_flags(metadata.veto_flags)
}
None => {
.entry(metadata.clone()) // Find the frame with the given metadata
.or_insert_with(|| {
// or create a new PartialFrame
let mut frame = PartialFrame::<D>::new(self.ttl, metadata.clone());
frame.push(digitiser_id, data);
self.frames.push_back(frame);
}
}
if let Err(e) = frame.span_init() {
// Initialise the span field
warn!("Frame span initiation failed {e}")
}
frame
});

frame.push(digitiser_id, data);
frame.push_veto_flags(metadata.veto_flags);
frame
}

pub(crate) fn poll(&mut self) -> Option<AggregatedFrame<D>> {
match self.frames.front() {
Some(frame) => {
if frame.is_complete(&self.expected_digitisers) || frame.is_expired() {
Some(
self.frames
.pop_front()
.expect("cache should have a frame, this should never fail")
.into(),
)
} else {
None
// Find a frame which is completed
let metadata = self
.frames
.keys()
.find(|metadata| {
let frame = self
.frames
.get(metadata)
.expect("Frame with metadata should exist");
frame.is_complete(&self.expected_digitisers) | frame.is_expired()
})
.cloned();

// If such a frame is found, then remove it from the hashmap and return as aggregated frame
metadata
.and_then(|metadata| self.frames.remove(&metadata))
.map(|frame| {
if let Err(e) = frame.end_span() {
warn!("Frame span drop failed {e}")
}
}
None => None,
}
frame.into()
})
}
}

impl<D: Debug + 'static> FindSpan<PartialFrame<D>> for FrameCache<D> {
type Key = FrameMetadata;
}

impl<D: Debug + 'static> FindSpanMut<PartialFrame<D>> for FrameCache<D> {
fn find_span_mut(&mut self, metadata: FrameMetadata) -> Option<&mut impl SpannedAggregator> {
self.frames
.iter_mut()
.find(|frame| frame.metadata.equals_ignoring_veto_flags(&metadata))
pub(crate) fn get_num_partial_frames(&self) -> usize {
self.frames.len()
}
}

Expand All @@ -97,22 +111,28 @@ mod test {

assert!(cache.poll().is_none());

assert_eq!(cache.get_num_partial_frames(), 0);
cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]));
assert_eq!(cache.get_num_partial_frames(), 1);

assert!(cache.poll().is_none());

cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]));
cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]));

assert!(cache.poll().is_none());

cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8]));
cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8]));

assert!(cache.poll().is_none());

cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]));
cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]));

{
let frame = cache.poll().unwrap();
assert_eq!(cache.get_num_partial_frames(), 0);

assert_eq!(frame.metadata, frame_1);

Expand Down Expand Up @@ -156,14 +176,17 @@ mod test {

assert!(cache.poll().is_none());

cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]));
cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]));

assert!(cache.poll().is_none());

cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]));
cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]));

assert!(cache.poll().is_none());

cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]));
cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]));

assert!(cache.poll().is_none());
Expand Down Expand Up @@ -197,4 +220,40 @@ mod test {

assert!(cache.poll().is_none());
}

#[test]
fn test_metadata_equality() {
let mut cache = FrameCache::<EventData>::new(Duration::from_millis(100), vec![1, 2]);

let timestamp = Utc::now();
let frame_1 = FrameMetadata {
timestamp,
period_number: 1,
protons_per_pulse: 8,
running: true,
frame_number: 1728,
veto_flags: 4,
};

let frame_2 = FrameMetadata {
timestamp,
period_number: 1,
protons_per_pulse: 8,
running: true,
frame_number: 1728,
veto_flags: 5,
};

assert_eq!(cache.frames.len(), 0);
assert!(cache.poll().is_none());

cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]));
assert_eq!(cache.frames.len(), 1);
assert!(cache.poll().is_none());

cache.push(2, &frame_2, EventData::dummy_data(0, 5, &[0, 1, 2]));
assert_eq!(cache.frames.len(), 1);
assert!(cache.poll().is_some());
//let frame = cache.poll().is_some();
}
}
18 changes: 15 additions & 3 deletions digitiser-aggregator/src/frame/partial.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::data::DigitiserData;
use std::time::Duration;
use supermusr_common::spanned::{SpanOnce, SpanOnceError, Spanned, SpannedAggregator, SpannedMut};
use supermusr_common::DigitizerId;
use supermusr_common::{
record_metadata_fields_to_span,
spanned::{SpanOnce, SpanOnceError, Spanned, SpannedAggregator, SpannedMut},
DigitizerId,
};
use supermusr_streaming_types::FrameMetadata;
use tokio::time::Instant;
use tracing::{info_span, Span};
Expand Down Expand Up @@ -64,7 +67,15 @@ impl<D> SpannedMut for PartialFrame<D> {
impl<D> SpannedAggregator for PartialFrame<D> {
fn span_init(&mut self) -> Result<(), SpanOnceError> {
self.span
.init(info_span!(target: "otel", parent: None, "Frame"))
.init(info_span!(target: "otel", parent: None, "Frame",
"metadata_timestamp" = self.metadata.timestamp.to_rfc3339(),
"metadata_frame_number" = self.metadata.frame_number,
"metadata_period_number" = self.metadata.period_number,
"metadata_veto_flags" = self.metadata.veto_flags,
"metadata_protons_per_pulse" = self.metadata.protons_per_pulse,
"metadata_running" = self.metadata.running,
"frame_is_expired" = tracing::field::Empty,
))
}

fn link_current_span<F: Fn() -> Span>(
Expand All @@ -73,6 +84,7 @@ impl<D> SpannedAggregator for PartialFrame<D> {
) -> Result<(), SpanOnceError> {
let span = self.span.get()?.in_scope(aggregated_span_fn);
span.follows_from(tracing::Span::current());
record_metadata_fields_to_span!(&self.metadata, span);
Ok(())
}

Expand Down
Loading
Loading