Skip to content

Commit

Permalink
feat(kafka-producer): ping kafka brokers (#24836)
Browse files Browse the repository at this point in the history
Co-authored-by: Brett Hoerner <[email protected]>
  • Loading branch information
Elvis339 and bretthoerner authored Sep 6, 2024
1 parent 1304a95 commit 044d633
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions rust/common/kafka/src/kafka_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use crate::config::KafkaConfig;
use futures::future::join_all;
use health::HealthHandle;
use rdkafka::error::KafkaError;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
use rdkafka::ClientConfig;
use serde::Serialize;
use serde_json::error::Error as SerdeError;
use thiserror::Error;
use tracing::debug;
use tracing::{debug, error, info};

pub struct KafkaContext {
liveness: HealthHandle,
Expand Down Expand Up @@ -55,7 +55,22 @@ pub async fn create_kafka_producer(
let api: FutureProducer<KafkaContext> =
client_config.create_with_context(KafkaContext { liveness })?;

// TODO: ping the kafka brokers to confirm configuration is OK (copy capture)
// "Ping" the Kafka brokers by requesting metadata
match api
.client()
.fetch_metadata(None, std::time::Duration::from_secs(15))
{
Ok(metadata) => {
info!(
"Successfully connected to Kafka brokers. Found {} topics.",
metadata.topics().len()
);
}
Err(error) => {
error!("Failed to fetch metadata from Kafka brokers: {:?}", error);
return Err(error);
}
}

Ok(api)
}
Expand Down

0 comments on commit 044d633

Please sign in to comment.