From 74b9fc7db20c100d3fc8a9096346d16d03189cad Mon Sep 17 00:00:00 2001 From: Modularius Date: Fri, 29 Nov 2024 11:57:23 +0000 Subject: [PATCH] Need to Join Completed Producer Threads in Trace to Events (#282) ## Summary of changes Producer threads created in the `trace-to-events` create were not being joined with the main thread, possibly causing excessive memory usage. This PR introduces a `tokio::select` statement which joins completed threads without waiting for them. ## Instruction for review/testing General code review. Tested on simulated and hifi input. --- trace-to-events/src/main.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/trace-to-events/src/main.rs b/trace-to-events/src/main.rs index 67abb66c..35f88e56 100644 --- a/trace-to-events/src/main.rs +++ b/trace-to-events/src/main.rs @@ -138,18 +138,25 @@ async fn main() -> anyhow::Result<()> { let mut kafka_producer_thread_set = JoinSet::new(); loop { - match consumer.recv().await { - Ok(m) => { - process_kafka_message( - &tracer, - &args, - &mut kafka_producer_thread_set, - &producer, - &m, - ); - consumer.commit_message(&m, CommitMode::Async).unwrap(); + tokio::select! { + msg = consumer.recv() => match msg { + Ok(m) => { + process_kafka_message( + &tracer, + &args, + &mut kafka_producer_thread_set, + &producer, + &m, + ); + consumer.commit_message(&m, CommitMode::Async).unwrap(); + } + Err(e) => warn!("Kafka error: {}", e) + }, + join_next = kafka_producer_thread_set.join_next() => { + if let Some(Err(e)) = join_next { + error!("Error Joining Kafka Producer Task: {e}"); + } } - Err(e) => warn!("Kafka error: {}", e), } } }