Skip to content

Commit

Permalink
Prevent out of order frames (#298)
Browse files Browse the repository at this point in the history
## Summary of changes

In the aggregator: prevents new frames from being created whose
timestamp is earlier than or equal to any that have previously been
dispatched. That is, if a digitiser messages arrives late, then if it's
frame is still in the cache, it is aggregated into that frame, otherwise
it is discarded and a warning emitted.

Closes
#288.

Note that we still assume that each frame's initial digitiser message
appears in chronological order.

## Instruction for review/testing

General code review.
Has been tested on simulated data.
  • Loading branch information
Modularius authored Jan 15, 2025
1 parent 0237ac4 commit 7501aaf
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
1 change: 1 addition & 0 deletions digitiser-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition.workspace = true

[dependencies]
anyhow.workspace = true
chrono.workspace = true
clap.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
Expand Down
12 changes: 12 additions & 0 deletions digitiser-aggregator/src/frame/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{partial::PartialFrame, AggregatedFrame};
use crate::data::{Accumulate, DigitiserData};
use chrono::{DateTime, Utc};
use std::{collections::VecDeque, fmt::Debug, time::Duration};
use supermusr_common::{record_metadata_fields_to_span, spanned::SpannedAggregator, DigitizerId};
use supermusr_streaming_types::FrameMetadata;
Expand All @@ -9,6 +10,7 @@ pub(crate) struct FrameCache<D: Debug> {
ttl: Duration,
expected_digitisers: Vec<DigitizerId>,

latest_timestamp_dispatched: Option<DateTime<Utc>>,
frames: VecDeque<PartialFrame<D>>,
}

Expand All @@ -20,6 +22,7 @@ where
Self {
ttl,
expected_digitisers,
latest_timestamp_dispatched: None,
frames: Default::default(),
}
}
Expand All @@ -31,6 +34,12 @@ where
metadata: &FrameMetadata,
data: D,
) {
if let Some(latest_timestamp_dispatched) = self.latest_timestamp_dispatched {
if metadata.timestamp <= latest_timestamp_dispatched {
warn!("Frame's timestamp earlier than or equal to the latest frame dispatched: {0} <= {1}", metadata.timestamp, latest_timestamp_dispatched);
return;
}
}
let frame = {
match self
.frames
Expand Down Expand Up @@ -92,6 +101,9 @@ where
if let Err(e) = frame.end_span() {
warn!("Frame span drop failed {e}")
}

// This frame is the next to be set to latest timestamp dispatched
self.latest_timestamp_dispatched = Some(frame.metadata.timestamp);
Some(frame.into())
} else {
None
Expand Down

0 comments on commit 7501aaf

Please sign in to comment.