Skip to content

Commit

Permalink
Fixed Aggregator Bug (#294)
Browse files Browse the repository at this point in the history
## Summary of changes

Changed behaviour of aggregator to block when the message send queue is
full rather than terminate. The previous behaviour prevented the
aggregator from restarting after a brief time offline, due to the
backlog building up form the trace-to-events component.

Closes
#293.

## Instruction for review/testing

General code review.
Tested with HiFi data.
  • Loading branch information
Modularius authored Jan 10, 2025
1 parent 0e0d2bd commit dcf5ca3
Showing 1 changed file with 12 additions and 19 deletions.
31 changes: 12 additions & 19 deletions digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ use supermusr_streaming_types::{
use tokio::{
select,
signal::unix::{signal, Signal, SignalKind},
sync::mpsc::{error::TrySendError, Receiver, Sender},
sync::mpsc::{error::SendError, Receiver, Sender},
task::JoinHandle,
};
use tracing::{debug, error, info, info_span, instrument, level_filters::LevelFilter, warn};

const PRODUCER_TIMEOUT: Timeout = Timeout::After(Duration::from_millis(100));

type AggregatedFrameToBufferSender = Sender<AggregatedFrame<EventData>>;
type TrySendAggregatedFrameError = TrySendError<AggregatedFrame<EventData>>;
type SendAggregatedFrameError = SendError<AggregatedFrame<EventData>>;

#[derive(Debug, Parser)]
#[clap(author, version, about)]
Expand Down Expand Up @@ -212,7 +212,7 @@ async fn process_kafka_message(
channel_send: &AggregatedFrameToBufferSender,
cache: &mut FrameCache<EventData>,
msg: &BorrowedMessage<'_>,
) -> Result<(), TrySendAggregatedFrameError> {
) -> Result<(), SendAggregatedFrameError> {
msg.headers().conditional_extract_to_current_span(use_otel);

if let Some(payload) = msg.payload() {
Expand Down Expand Up @@ -263,7 +263,7 @@ async fn process_digitiser_event_list_message(
channel_send: &AggregatedFrameToBufferSender,
cache: &mut FrameCache<EventData>,
msg: DigitizerEventListMessage<'_>,
) -> Result<(), TrySendAggregatedFrameError> {
) -> Result<(), SendAggregatedFrameError> {
match msg.metadata().try_into() {
Ok(metadata) => {
debug!("Event packet: metadata: {:?}", msg.metadata());
Expand Down Expand Up @@ -291,7 +291,7 @@ async fn process_digitiser_event_list_message(
async fn cache_poll(
channel_send: &AggregatedFrameToBufferSender,
cache: &mut FrameCache<EventData>,
) -> Result<(), TrySendAggregatedFrameError> {
) -> Result<(), SendAggregatedFrameError> {
while let Some(frame) = cache.poll() {
let span = info_span!(target: "otel", "Frame Completed");
span.follows_from(
Expand All @@ -302,21 +302,14 @@ async fn cache_poll(
);
let _guard = span.enter();

// For each frame that is ready to send,
// `try_send` appends it to the channel queue
// without blocking
if let Err(e) = channel_send.try_send(frame) {
// If the queue is full (or another error occurs),
// then we emit a fatal error and close the program.
match &e {
TrySendError::Closed(_) => {
error!("Send-Frame Channel Closed");
}
TrySendError::Full(_) => {
error!("Send-Frame Buffer Full");
}
// Reserves space in the message queue if it is available
// Or waits for space if none is available.
match channel_send.reserve().await {
Ok(permit) => permit.send(frame),
Err(_) => {
error!("Send-Frame Error");
return Err(SendError(frame));
}
return Err(e);
}
}
Ok(())
Expand Down

0 comments on commit dcf5ca3

Please sign in to comment.