Skip to content

Commit

Permalink
Changes structure of Frame Completed span in nexus writer (#281)
Browse files Browse the repository at this point in the history
## Summary of changes

- In `cache_poll`, the `Frame Completed` span now occurs for every frame
dispatched from the cache. Allowing us to see how many are unqueued at
the same time.
- Fixes a bug whereby complete frames in the queue are marked as expired
because they were behind incomplete frames.
- Removed `debug!("{args}");` log statement.

## Instruction for review/testing

General code review.

Tested on simulated data.
  • Loading branch information
Modularius authored Nov 27, 2024
1 parent 7cd85bc commit b86a241
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
3 changes: 2 additions & 1 deletion digitiser-aggregator/src/frame/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ where
Some(frame) => {
frame.push(digitiser_id, data);
frame.push_veto_flags(metadata.veto_flags);
frame.set_completion_status(&self.expected_digitisers);
frame
}
None => {
Expand Down Expand Up @@ -82,7 +83,7 @@ where
if self
.frames
.front()
.is_some_and(|frame| frame.is_complete(&self.expected_digitisers) | frame.is_expired())
.is_some_and(|frame| frame.is_complete() || frame.is_expired())
{
let frame = self
.frames
Expand Down
15 changes: 12 additions & 3 deletions digitiser-aggregator/src/frame/partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tracing::{info_span, Span};

pub(crate) struct PartialFrame<D> {
span: SpanOnce,
complete: bool,
expiry: Instant,

pub(super) metadata: FrameMetadata,
Expand All @@ -23,18 +24,26 @@ impl<D> PartialFrame<D> {

Self {
span: SpanOnce::default(),
complete: false,
expiry,
metadata,
digitiser_data: Default::default(),
}
}

pub(super) fn digitiser_ids(&self) -> Vec<DigitizerId> {
let mut cache_digitiser_ids: Vec<DigitizerId> =
self.digitiser_data.iter().map(|i| i.0).collect();
cache_digitiser_ids.sort();
cache_digitiser_ids
}

pub(super) fn set_completion_status(&mut self, expected_digitisers: &[DigitizerId]) {
if self.digitiser_ids() == expected_digitisers {
self.complete = true;
}
}

pub(super) fn push(&mut self, digitiser_id: DigitizerId, data: D) {
self.digitiser_data.push((digitiser_id, data));
}
Expand All @@ -43,8 +52,8 @@ impl<D> PartialFrame<D> {
self.metadata.veto_flags |= veto_flags;
}

pub(super) fn is_complete(&self, expected_digitisers: &[DigitizerId]) -> bool {
self.digitiser_ids() == expected_digitisers
pub(super) fn is_complete(&self) -> bool {
self.complete
}

pub(super) fn is_expired(&self) -> bool {
Expand Down Expand Up @@ -92,7 +101,7 @@ impl<D> SpannedAggregator for PartialFrame<D> {
#[cfg(not(test))] // In test mode, the frame.span() are not initialised
self.span()
.get()?
.record("frame_is_expired", self.is_expired());
.record("frame_is_expired", self.is_expired() && !self.is_complete());
Ok(())
}
}
8 changes: 7 additions & 1 deletion digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,14 @@ async fn cache_poll(
channel_send: &AggregatedFrameToBufferSender,
cache: &mut FrameCache<EventData>,
) -> Result<(), TrySendAggregatedFrameError> {
let span = info_span!(target: "otel", "Frame Complete");
while let Some(frame) = cache.poll() {
let span = info_span!(target: "otel", "Frame Completed");
span.follows_from(
frame
.span()
.get()
.expect("Span should exist, this should never fail"),
);
let _guard = span.enter();

// For each frame that is ready to send,
Expand Down

0 comments on commit b86a241

Please sign in to comment.