Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
capture: add support for the /batch request shape (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Apr 22, 2024
1 parent 591d765 commit 4db3670
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 205 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
anyhow = "1.0"
assert-json-diff = "2.0.2"
async-trait = "0.1.74"
axum = { version = "0.7.5", features = ["http2", "macros"] }
axum = { version = "0.7.5", features = ["http2", "macros", "matched-path"] }
axum-client-ip = "0.6.0"
base64 = "0.22.0"
bytes = "1"
Expand Down
32 changes: 21 additions & 11 deletions capture/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
use crate::token::InvalidTokenReason;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use thiserror::Error;
use time::OffsetDateTime;
use uuid::Uuid;

#[derive(Debug, Deserialize, Serialize)]
pub struct CaptureRequest {
#[serde(alias = "$token", alias = "api_key")]
pub token: String,

pub event: String,
pub properties: HashMap<String, Value>,
}
use crate::token::InvalidTokenReason;

#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum CaptureResponseCode {
Expand Down Expand Up @@ -84,3 +76,21 @@ impl IntoResponse for CaptureError {
.into_response()
}
}

#[derive(Clone, Default, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
pub uuid: Uuid,
pub distinct_id: String,
pub ip: String,
pub data: String,
pub now: String,
#[serde(with = "time::serde::rfc3339::option")]
pub sent_at: Option<OffsetDateTime>,
pub token: String,
}

impl ProcessedEvent {
pub fn key(&self) -> String {
format!("{}:{}", self.token, self.distinct_id)
}
}
4 changes: 2 additions & 2 deletions capture/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
pub mod api;
pub mod capture;
pub mod config;
pub mod event;
pub mod health;
pub mod limiters;
pub mod prometheus;
Expand All @@ -12,3 +10,5 @@ pub mod sinks;
pub mod time;
pub mod token;
pub mod utils;
pub mod v0_endpoint;
pub mod v0_request;
40 changes: 33 additions & 7 deletions capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::health::HealthRegistry;
use crate::{capture, limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource};
use crate::{
limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource, v0_endpoint,
};

use crate::prometheus::{setup_metrics_recorder, track_metrics};

Expand Down Expand Up @@ -58,17 +60,41 @@ pub fn router<
.route("/", get(index))
.route("/_readiness", get(index))
.route("/_liveness", get(move || ready(liveness.get_status())))
.route(
"/e",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/e/",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/batch",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/batch/",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/i/v0/e",
post(capture::event)
.get(capture::event)
.options(capture::options),
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/i/v0/e/",
post(capture::event)
.get(capture::event)
.options(capture::options),
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.layer(TraceLayer::new_for_http())
.layer(cors)
Expand Down
6 changes: 2 additions & 4 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::CaptureError;
use crate::api::{CaptureError, ProcessedEvent};
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::health::HealthHandle;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;
Expand Down Expand Up @@ -259,9 +258,8 @@ impl Event for KafkaSink {

#[cfg(test)]
mod tests {
use crate::api::CaptureError;
use crate::api::{CaptureError, ProcessedEvent};
use crate::config;
use crate::event::ProcessedEvent;
use crate::health::HealthRegistry;
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
Expand Down
3 changes: 1 addition & 2 deletions capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use async_trait::async_trait;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;
use crate::api::{CaptureError, ProcessedEvent};

pub mod kafka;
pub mod print;
Expand Down
3 changes: 1 addition & 2 deletions capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use async_trait::async_trait;
use metrics::{counter, histogram};
use tracing::log::info;

use crate::api::CaptureError;
use crate::event::ProcessedEvent;
use crate::api::{CaptureError, ProcessedEvent};
use crate::sinks::Event;

pub struct PrintSink {}
Expand Down
Loading

0 comments on commit 4db3670

Please sign in to comment.