diff --git a/trace-to-events/src/main.rs b/trace-to-events/src/main.rs index 67abb66c..35f88e56 100644 --- a/trace-to-events/src/main.rs +++ b/trace-to-events/src/main.rs @@ -138,18 +138,25 @@ async fn main() -> anyhow::Result<()> { let mut kafka_producer_thread_set = JoinSet::new(); loop { - match consumer.recv().await { - Ok(m) => { - process_kafka_message( - &tracer, - &args, - &mut kafka_producer_thread_set, - &producer, - &m, - ); - consumer.commit_message(&m, CommitMode::Async).unwrap(); + tokio::select! { + msg = consumer.recv() => match msg { + Ok(m) => { + process_kafka_message( + &tracer, + &args, + &mut kafka_producer_thread_set, + &producer, + &m, + ); + consumer.commit_message(&m, CommitMode::Async).unwrap(); + } + Err(e) => warn!("Kafka error: {}", e) + }, + join_next = kafka_producer_thread_set.join_next() => { + if let Some(Err(e)) = join_next { + error!("Error Joining Kafka Producer Task: {e}"); + } } - Err(e) => warn!("Kafka error: {}", e), } } }