Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
neilkakkar committed May 29, 2024
2 parents 56e85a0 + ff0780c commit f212632
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 37 deletions.
3 changes: 3 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Config {
pub redis_url: String,
pub otel_url: Option<String>,

#[envconfig(default = "false")]
pub overflow_enabled: bool,

#[envconfig(default = "100")]
pub overflow_per_second_limit: NonZeroU32,

Expand Down
42 changes: 24 additions & 18 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,30 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
let partition = match config.overflow_enabled {
false => None,
true => {
let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
Some(partition)
}
};
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

Expand Down
55 changes: 43 additions & 12 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,51 +36,78 @@ impl rdkafka::ClientContext for KafkaContext {
for (topic, stats) in stats.topics {
gauge!(
"capture_kafka_produce_avg_batch_size_bytes",
"topic" => topic.clone()
"topic" => topic.clone()
)
.set(stats.batchsize.avg as f64);
gauge!(
"capture_kafka_produce_avg_batch_size_events",

"topic" => topic
)
.set(stats.batchcnt.avg as f64);
}

for (_, stats) in stats.brokers {
let id_string = format!("{}", stats.nodeid);
if let Some(rtt) = stats.rtt {
gauge!(
"capture_kafka_produce_rtt_latency_ms",
"quantile" => "p50",
"broker" => id_string.clone()
)
.set(rtt.p50 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_ms",
"quantile" => "p90",
"broker" => id_string.clone()
)
.set(rtt.p90 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_ms",
"quantile" => "p95",
"broker" => id_string.clone()
)
.set(rtt.p95 as f64);
gauge!(
"capture_kafka_produce_rtt_latency_ms",
"quantile" => "p99",
"broker" => id_string.clone()
)
.set(rtt.p99 as f64);
}

gauge!(
"capture_kafka_broker_requests_pending",

"broker" => id_string.clone()
)
.set(stats.outbuf_cnt as f64);
gauge!(
"capture_kafka_broker_responses_awaiting",

"broker" => id_string.clone()
)
.set(stats.waitresp_cnt as f64);
counter!(
"capture_kafka_broker_tx_errors_total",

"broker" => id_string.clone()
)
.absolute(stats.txerrs);
counter!(
"capture_kafka_broker_rx_errors_total",

"broker" => id_string
"broker" => id_string.clone()
)
.absolute(stats.rxerrs);
counter!(
"capture_kafka_broker_request_timeouts",
"broker" => id_string
)
.absolute(stats.req_timeouts);
}
}
}

#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
main_topic: String,
historical_topic: String,
}
Expand All @@ -89,7 +116,7 @@ impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -150,7 +177,11 @@ impl KafkaSink {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
// TODO: deprecate capture-led overflow or move logic in handler
if self.partition.is_limited(&event_key) {
let is_limited = match &self.partition {
None => false,
Some(partition) => partition.is_limited(&event_key),
};
if is_limited {
(&self.main_topic, None) // Analytics overflow goes to the main topic without locality
} else {
(&self.main_topic, Some(event_key.as_str()))
Expand Down Expand Up @@ -280,11 +311,11 @@ mod tests {
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter = OverflowLimiter::new(
let limiter = Some(OverflowLimiter::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
None,
);
));
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = config::KafkaConfig {
kafka_producer_linger_ms: 0,
Expand Down
1 change: 1 addition & 0 deletions capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
overflow_enabled: false,
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
Expand Down
54 changes: 54 additions & 0 deletions capture/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async fn it_overflows_events_on_burst() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = true;
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

Expand Down Expand Up @@ -223,6 +224,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = true;
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

Expand Down Expand Up @@ -254,6 +256,58 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn it_skips_overflows_when_disabled() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = false;
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
"event": "event1",
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"distinct_id": distinct_id
},{
"token": token,
"event": "event3",
"distinct_id": distinct_id
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

// Should have triggered overflow, but has not
assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);
Ok(())
}

#[tokio::test]
async fn it_trims_distinct_id() -> Result<()> {
setup_tracing();
Expand Down
5 changes: 0 additions & 5 deletions feature-flags/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ impl Client for RedisClient {
Ok(fut?)
}

// TODO: Ask Xavier if there's a better way to handle this.
// The problem: I want to match on the error type from this function, and do appropriate things like 400 or 500 response.
// Buuut, if I use anyhow::Error, I can't reverse-coerce into a NotFound or serde_pickle::Error.
// Thus, I need to create a custom error enum of all possible errors + my own custom not found, so I can match on it.
// Is this the canonical way?
async fn get(&self, k: String) -> Result<String, CustomRedisError> {
let mut conn = self.client.get_async_connection().await?;

Expand Down
3 changes: 1 addition & 2 deletions feature-flags/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use serde_json::json;
use std::sync::Arc;

use crate::{
flag_definitions, redis::{Client, RedisClient}, team::{self, Team}
};
flag_definitions, redis::{Client, RedisClient}, team::{self, Team}}
use rand::{distributions::Alphanumeric, Rng};

pub fn random_string(prefix: &str, length: usize) -> String {
Expand Down

0 comments on commit f212632

Please sign in to comment.