Skip to content

Commit

Permalink
Run rdkafka logging with an explicit Sentry Hub (#5155)
Browse files Browse the repository at this point in the history
This explicitly uses `Hub::run` to trigger logging within a Sentry Hub.
The hunch here would be that these logs are running on a background thread that was initialized before Sentry, and so has not inherited a properly configured Hub.
  • Loading branch information
Swatinem authored Dec 6, 2023
1 parent 6a91648 commit 877323a
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 4 deletions.
1 change: 1 addition & 0 deletions rust_snuba/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust_snuba/rust_arroyo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
chrono = "0.4.26"
once_cell = "1.18.0"
rdkafka = { git = "https://github.com/getsentry/rust-rdkafka", branch = "base-consumer", features = ["cmake-build", "tracing"] }
sentry = { version = "0.32.0" }
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
thiserror = "1.0"
Expand Down
13 changes: 9 additions & 4 deletions rust_snuba/rust_arroyo/src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaError;
use rdkafka::message::{BorrowedMessage, Message};
use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
use sentry::Hub;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -86,13 +87,14 @@ impl<'a, C: AssignmentCallbacks> CommitOffsets for OffsetCommitter<'a, C> {
}

pub struct CustomContext<C: AssignmentCallbacks> {
hub: Arc<Hub>,
callbacks: C,
consumer_offsets: Arc<Mutex<HashMap<Partition, u64>>>,
}

impl<C: AssignmentCallbacks + Send + Sync> ClientContext for CustomContext<C> {
fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
match level {
Hub::run(self.hub.clone(), || match level {
RDKafkaLogLevel::Emerg
| RDKafkaLogLevel::Alert
| RDKafkaLogLevel::Critical
Expand All @@ -108,12 +110,14 @@ impl<C: AssignmentCallbacks + Send + Sync> ClientContext for CustomContext<C> {
RDKafkaLogLevel::Debug => {
tracing::debug!("librdkafka: {fac} {log_message}");
}
}
})
}

fn error(&self, error: KafkaError, reason: &str) {
let error: &dyn std::error::Error = &error;
tracing::error!(error, "librdkafka: {error}: {reason}");
Hub::run(self.hub.clone(), || {
let error: &dyn std::error::Error = &error;
tracing::error!(error, "librdkafka: {error}: {reason}");
})
}
}

Expand Down Expand Up @@ -170,6 +174,7 @@ impl<C: AssignmentCallbacks> KafkaConsumer<C> {
pub fn new(config: KafkaConfig, topics: &[Topic], callbacks: C) -> Result<Self, ConsumerError> {
let offsets = Arc::new(Mutex::new(HashMap::new()));
let context = CustomContext {
hub: Hub::current(),
callbacks,
consumer_offsets: offsets.clone(),
};
Expand Down

0 comments on commit 877323a

Please sign in to comment.