Skip to content

Commit

Permalink
feat(err): we no longer read straight from capture (#25357)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Oct 3, 2024
1 parent d9d6296 commit 758c256
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 143 deletions.
44 changes: 1 addition & 43 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,5 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.0 " }
uuid = { version = "1.6.1", features = ["v7", "serde"] }
neon = "1"
moka = {version = "0.12.8", features = ["sync"] }
quick_cache = "0.6.9"
ahash = "0.8.11"
3 changes: 1 addition & 2 deletions rust/error-tracking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ axum = { workspace = true }
metrics = { workspace = true }
common-metrics = { path = "../common/metrics" }
common-alloc = { path = "../common/alloc" }
common-types = { path = "../common/types" }
common-kafka = { path = "../common/kafka" }
thiserror = { workspace = true }
sqlx = { workspace = true }
moka = { workspace = true }
serde_json = { workspace = true }

[lints]
workspace = true
6 changes: 1 addition & 5 deletions rust/error-tracking/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use health::{HealthHandle, HealthRegistry};
use sqlx::{postgres::PgPoolOptions, PgPool};
use tracing::info;

use crate::{config::Config, error::Error, team_cache::TeamCache};
use crate::{config::Config, error::Error};

pub struct AppContext {
pub health_registry: HealthRegistry,
pub worker_liveness: HealthHandle,
pub consumer: SingleTopicConsumer,
pub pool: PgPool,
pub team_cache: TeamCache,
}

impl AppContext {
Expand All @@ -32,14 +31,11 @@ impl AppContext {
config.consumer.kafka_consumer_topic
);

let team_cache = TeamCache::new(config.team_cache_capacity, config.team_cache_ttl_secs);

Ok(Self {
health_registry,
worker_liveness,
consumer,
pool,
team_cache,
})
}
}
8 changes: 1 addition & 7 deletions rust/error-tracking/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,11 @@ pub struct Config {
// Rust service connect directly to postgres, not via pgbouncer, so we keep this low
#[envconfig(default = "4")]
pub max_pg_connections: u32,

#[envconfig(default = "300")]
pub team_cache_ttl_secs: u64,

#[envconfig(default = "10000")]
pub team_cache_capacity: u64,
}

impl Config {
pub fn init_with_defaults() -> Result<Self, envconfig::Error> {
ConsumerConfig::set_defaults("error-tracking-rs", "exceptions_ingestions");
ConsumerConfig::set_defaults("error-tracking-rs", "exception_symbolification_events");
Self::init_from_env()
}
}
1 change: 0 additions & 1 deletion rust/error-tracking/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod app_context;
pub mod config;
pub mod error;
pub mod team_cache;
32 changes: 6 additions & 26 deletions rust/error-tracking/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::{future::ready, sync::Arc};
use axum::{routing::get, Router};
use common_kafka::kafka_consumer::RecvErr;
use common_metrics::{serve, setup_metrics_routes};
use common_types::{CapturedEvent, Team};
use envconfig::Envconfig;
use error_tracking::{app_context::AppContext, config::Config, error::Error};
use serde_json::Value;
use tokio::task::JoinHandle;
use tracing::{error, info, warn};
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};

common_alloc::used!();
Expand Down Expand Up @@ -55,43 +55,23 @@ async fn main() -> Result<(), Error> {

loop {
context.worker_liveness.report_healthy().await;
let (event, offset): (CapturedEvent, _) = match context.consumer.json_recv().await {
// Just grab the event as a serde_json::Value and immediately drop it,
// we can work out a real type for it later (once we're deployed etc)
let (_, offset): (Value, _) = match context.consumer.json_recv().await {
Ok(r) => r,
Err(RecvErr::Kafka(e)) => {
return Err(e.into()); // Just die if we recieve a Kafka error
}
Err(err) => {
// If we failed to parse the message, or it was empty, just log and continue, our
// consumer has already stored the offset for us.
metrics::counter!("error_tracking_errors").increment(1);
metrics::counter!("error_tracking_errors", "cause" => "recv_err").increment(1);
error!("Error receiving message: {:?}", err);
continue;
}
};
metrics::counter!("error_tracking_events_received").increment(1);

// Team verification
// If we encounter a sqlx error, just die. The by_token call explicitly
// returns an Option<Team>, so any error here is a PG failure, and us falling
// over sheds load.
let token = &event.token;
let pool = &context.pool;
let team = context
.team_cache
.get_or_insert_with(token, async move { Ok(Team::by_token(pool, token).await?) })
.await?;

match team {
Some(t) => t,
None => {
// If we failed to find the team, just store the offset and drop the event, we don't care about it
offset.store().unwrap();
warn!("Team not found for token: {}", event.token);
metrics::counter!("error_tracking_team_not_found").increment(1);
continue;
}
};

// This is where the rest of the processing would go
offset.store().unwrap();
}
Expand Down
58 changes: 0 additions & 58 deletions rust/error-tracking/src/team_cache.rs

This file was deleted.

0 comments on commit 758c256

Please sign in to comment.