Skip to content

Commit

Permalink
feat(error tracking): verify tokens (#25258)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Sep 30, 2024
1 parent 0470d65 commit 9fca67a
Show file tree
Hide file tree
Showing 18 changed files with 301 additions and 37 deletions.
58 changes: 45 additions & 13 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,5 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.0 " }
uuid = { version = "1.6.1", features = ["v7", "serde"] }
neon = "1"
quick_cache = "0.6.5"
moka = {version = "0.12.8", features = ["sync"] }
ahash = "0.8.11"
3 changes: 2 additions & 1 deletion rust/common/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ workspace = true
[dependencies]
serde = { workspace = true }
uuid = { workspace = true }
time = {workspace = true }
time = { workspace = true }
sqlx = { workspace = true }
2 changes: 1 addition & 1 deletion rust/common/types/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Common types

For types used across our projects, like events, persons, etc. Each time you go to copy a type from somewhere, put it here instead.
For types used across our projects, like events, persons, etc. Each time you go to copy a type from somewhere, put it here instead.
4 changes: 4 additions & 0 deletions rust/common/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
mod event;
mod team;

// Events
pub use event::CapturedEvent;

// Teams
pub use team::Team;
63 changes: 63 additions & 0 deletions rust/common/types/src/team.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize, sqlx::FromRow)]
pub struct Team {
pub id: i32,
pub name: String,
pub api_token: String,
}

// We use query_as functions here rather than macros to avoid having to wrangle sqlx... TODO, be better
// about that.
impl Team {
pub async fn by_token<'c, E>(executor: E, token: &str) -> Result<Option<Self>, sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
sqlx::query_as::<_, Self>(
r#"
SELECT id, name, api_token
FROM teams
WHERE api_token = $1
"#,
)
.bind(token)
.fetch_optional(executor)
.await
}

pub async fn by_id<'c, E>(executor: E, id: i32) -> Result<Option<Self>, sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
sqlx::query_as::<_, Self>(
r#"
SELECT id, name, api_token
FROM teams
WHERE id = $1
"#,
)
.bind(id)
.fetch_optional(executor)
.await
}

pub async fn bulk_by_tokens<'c, E>(
executor: E,
tokens: &[&str],
) -> Result<Vec<Self>, sqlx::Error>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
sqlx::query_as::<_, Self>(
r#"
SELECT id, name, api_token
FROM teams
WHERE api_token = ANY($1)
"#,
)
.bind(tokens)
.fetch_all(executor)
.await
}
}
2 changes: 2 additions & 0 deletions rust/error-tracking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ common-alloc = { path = "../common/alloc" }
common-types = { path = "../common/types" }
common-kafka = { path = "../common/kafka" }
thiserror = { workspace = true }
sqlx = { workspace = true }
moka = { workspace = true }

[lints]
workspace = true
12 changes: 11 additions & 1 deletion rust/error-tracking/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use std::time::Duration;

use common_kafka::kafka_consumer::SingleTopicConsumer;
use health::{HealthHandle, HealthRegistry};
use sqlx::{postgres::PgPoolOptions, PgPool};
use tracing::info;

use crate::{config::Config, error::Error};
use crate::{config::Config, error::Error, team_cache::TeamCache};

pub struct AppContext {
pub health_registry: HealthRegistry,
pub worker_liveness: HealthHandle,
pub consumer: SingleTopicConsumer,
pub pool: PgPool,
pub team_cache: TeamCache,
}

impl AppContext {
Expand All @@ -21,15 +24,22 @@ impl AppContext {

let consumer = SingleTopicConsumer::new(config.kafka.clone(), config.consumer.clone())?;

let options = PgPoolOptions::new().max_connections(config.max_pg_connections);
let pool = options.connect(&config.database_url).await?;

info!(
"AppContext initialized, subscribed to topic {}",
config.consumer.kafka_consumer_topic
);

let team_cache = TeamCache::new(config.team_cache_capacity, config.team_cache_ttl_secs);

Ok(Self {
health_registry,
worker_liveness,
consumer,
pool,
team_cache,
})
}
}
13 changes: 13 additions & 0 deletions rust/error-tracking/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ pub struct Config {

#[envconfig(nested = true)]
pub consumer: ConsumerConfig,

#[envconfig(default = "postgres://posthog:posthog@localhost:5432/posthog")]
pub database_url: String,

// Rust service connect directly to postgres, not via pgbouncer, so we keep this low
#[envconfig(default = "4")]
pub max_pg_connections: u32,

#[envconfig(default = "300")]
pub team_cache_ttl_secs: u64,

#[envconfig(default = "10000")]
pub team_cache_capacity: u64,
}

impl Config {
Expand Down
2 changes: 2 additions & 0 deletions rust/error-tracking/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ pub enum Error {
ConfigError(#[from] envconfig::Error),
#[error("Kafka error: {0}")]
KafkaError(#[from] KafkaError),
#[error("Sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
}
1 change: 1 addition & 0 deletions rust/error-tracking/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod app_context;
pub mod config;
pub mod error;
pub mod team_cache;
38 changes: 34 additions & 4 deletions rust/error-tracking/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{future::ready, sync::Arc};

use axum::{routing::get, Router};
use common_kafka::kafka_consumer::RecvErr;
use common_metrics::{serve, setup_metrics_routes};
use common_types::CapturedEvent;
use common_types::{CapturedEvent, Team};
use envconfig::Envconfig;
use error_tracking::{app_context::AppContext, config::Config, error::Error};
use tokio::task::JoinHandle;
use tracing::{error, info};
use tracing::{error, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};

common_alloc::used!();
Expand Down Expand Up @@ -54,15 +55,44 @@ async fn main() -> Result<(), Error> {

loop {
context.worker_liveness.report_healthy().await;
let (_, offset): (CapturedEvent, _) = match context.consumer.json_recv().await {
let (event, offset): (CapturedEvent, _) = match context.consumer.json_recv().await {
Ok(r) => r,
Err(RecvErr::Kafka(e)) => {
return Err(e.into()); // Just die if we recieve a Kafka error
}
Err(err) => {
// If we failed to parse the message, or it was empty, just log and continue, our
// consumer has already stored the offset for us.
metrics::counter!("error_tracking_errors").increment(1);
error!("Error receiving message: {:?}", err);
continue;
}
};
offset.store().unwrap();
metrics::counter!("error_tracking_events_received").increment(1);

// Team verification
// If we encounter a sqlx error, just die. The by_token call explicitly
// returns an Option<Team>, so any error here is a PG failure, and us falling
// over sheds load.
let token = &event.token;
let pool = &context.pool;
let team = context
.team_cache
.get_or_insert_with(token, async move { Ok(Team::by_token(pool, token).await?) })
.await?;

match team {
Some(t) => t,
None => {
// If we failed to find the team, just store the offset and drop the event, we don't care about it
offset.store().unwrap();
warn!("Team not found for token: {}", event.token);
metrics::counter!("error_tracking_team_not_found").increment(1);
continue;
}
};

// This is where the rest of the processing would go
offset.store().unwrap();
}
}
Loading

0 comments on commit 9fca67a

Please sign in to comment.