Skip to content

Commit

Permalink
Simplified type specification in aggregator and writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Modularius committed Aug 30, 2024
1 parent 553ed98 commit 9ce825a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 30 deletions.
28 changes: 10 additions & 18 deletions digitiser-aggregator/src/frame/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ where
}
}

pub(crate) fn push(&mut self, digitiser_id: DigitizerId, metadata: FrameMetadata, data: D) {
pub(crate) fn push(&mut self, digitiser_id: DigitizerId, metadata: &FrameMetadata, data: D) {
match self
.frames
.iter_mut()
.find(|frame| frame.metadata.equals_ignoring_veto_flags(&metadata))
.find(|frame| frame.metadata.equals_ignoring_veto_flags(metadata))
{
Some(frame) => {
frame.push(digitiser_id, data);
frame.push_veto_flags(metadata.veto_flags)
}
None => {
let mut frame = PartialFrame::<D>::new(self.ttl, metadata);
let mut frame = PartialFrame::<D>::new(self.ttl, metadata.clone());
frame.push(digitiser_id, data);
self.frames.push_back(frame);
}
Expand Down Expand Up @@ -98,23 +98,19 @@ mod test {

assert!(cache.poll().is_none());

cache.push(0, frame_1.clone(), EventData::dummy_data(0, 5, &[0, 1, 2]));
cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]));

assert!(cache.poll().is_none());

cache.push(1, frame_1.clone(), EventData::dummy_data(0, 5, &[3, 4, 5]));
cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]));

assert!(cache.poll().is_none());

cache.push(4, frame_1.clone(), EventData::dummy_data(0, 5, &[6, 7, 8]));
cache.push(4, &frame_1, EventData::dummy_data(0, 5, &[6, 7, 8]));

assert!(cache.poll().is_none());

cache.push(
8,
frame_1.clone(),
EventData::dummy_data(0, 5, &[9, 10, 11]),
);
cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]));

{
let frame = cache.poll().unwrap();
Expand Down Expand Up @@ -161,19 +157,15 @@ mod test {

assert!(cache.poll().is_none());

cache.push(0, frame_1.clone(), EventData::dummy_data(0, 5, &[0, 1, 2]));
cache.push(0, &frame_1, EventData::dummy_data(0, 5, &[0, 1, 2]));

assert!(cache.poll().is_none());

cache.push(1, frame_1.clone(), EventData::dummy_data(0, 5, &[3, 4, 5]));
cache.push(1, &frame_1, EventData::dummy_data(0, 5, &[3, 4, 5]));

assert!(cache.poll().is_none());

cache.push(
8,
frame_1.clone(),
EventData::dummy_data(0, 5, &[9, 10, 11]),
);
cache.push(8, &frame_1, EventData::dummy_data(0, 5, &[9, 10, 11]));

assert!(cache.poll().is_none());

Expand Down
20 changes: 10 additions & 10 deletions digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ use supermusr_common::{
tracer::{FutureRecordTracerExt, OptionalHeaderTracerExt, TracerEngine, TracerOptions},
CommonKafkaOpts, DigitizerId,
};
use supermusr_streaming_types::{
dev2_digitizer_event_v2_generated::{
digitizer_event_list_message_buffer_has_identifier, root_as_digitizer_event_list_message,
},
FrameMetadata,
use supermusr_streaming_types::dev2_digitizer_event_v2_generated::{
digitizer_event_list_message_buffer_has_identifier, root_as_digitizer_event_list_message,
};
use tokio::task::JoinSet;
use tracing::{debug, error, info_span, level_filters::LevelFilter, warn};
Expand Down Expand Up @@ -186,15 +183,16 @@ async fn on_message(
);
match root_as_digitizer_event_list_message(payload) {
Ok(msg) => {
let metadata_result: Result<FrameMetadata, _> = msg.metadata().try_into();
match metadata_result {
match msg.metadata().try_into() {
Ok(metadata) => {
debug!("Event packet: metadata: {:?}", msg.metadata());
cache.push(msg.digitizer_id(), &metadata, msg.into());

// Append Metadata to Span
tracing::Span::current().record("digitiser_id", msg.digitizer_id());
record_metadata_fields_to_span!(&metadata, tracing::Span::current());

debug!("Event packet: metadata: {:?}", msg.metadata());
cache.push(msg.digitizer_id(), metadata.clone(), msg.into());

// Aggregate current Span to Frame Span
if let Some(frame_span) = cache.find_span_mut(metadata) {
if frame_span.is_waiting() {
if let Err(e) = frame_span
Expand All @@ -211,6 +209,8 @@ async fn on_message(
})
};
}

// Poll cache for frame completion
cache_poll(
use_otel,
kafka_producer_thread_set,
Expand Down
3 changes: 1 addition & 2 deletions nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,7 @@ fn process_frame_assembled_event_list_message(nexus_engine: &mut NexusEngine, pa
match root_as_frame_assembled_event_list_message(payload) {
Ok(data) => match nexus_engine.process_event_list(&data) {
Ok(run) => {
let metadata_result: Result<FrameMetadata, _> = data.metadata().try_into();
match metadata_result {
match data.metadata().try_into() as Result<FrameMetadata, _> {
Ok(metadata) => {
record_metadata_fields_to_span!(&metadata, tracing::Span::current());
}
Expand Down

0 comments on commit 9ce825a

Please sign in to comment.