Skip to content

Commit

Permalink
Need to Join Completed Producer Threads in Trace to Events (#282)
Browse files Browse the repository at this point in the history
## Summary of changes

Producer threads created in the `trace-to-events` create were not being
joined with the main thread, possibly causing excessive memory usage.

This PR introduces a `tokio::select` statement which joins completed
threads without waiting for them.

## Instruction for review/testing

General code review.

Tested on simulated and hifi input.
  • Loading branch information
Modularius authored Nov 29, 2024
1 parent b86a241 commit 74b9fc7
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions trace-to-events/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,25 @@ async fn main() -> anyhow::Result<()> {
let mut kafka_producer_thread_set = JoinSet::new();

loop {
match consumer.recv().await {
Ok(m) => {
process_kafka_message(
&tracer,
&args,
&mut kafka_producer_thread_set,
&producer,
&m,
);
consumer.commit_message(&m, CommitMode::Async).unwrap();
tokio::select! {
msg = consumer.recv() => match msg {
Ok(m) => {
process_kafka_message(
&tracer,
&args,
&mut kafka_producer_thread_set,
&producer,
&m,
);
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
Err(e) => warn!("Kafka error: {}", e)
},
join_next = kafka_producer_thread_set.join_next() => {
if let Some(Err(e)) = join_next {
error!("Error Joining Kafka Producer Task: {e}");
}
}
Err(e) => warn!("Kafka error: {}", e),
}
}
}
Expand Down

0 comments on commit 74b9fc7

Please sign in to comment.