Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
Add support for the /batch request shape, refacto token extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Apr 17, 2024
1 parent 56d4615 commit 94b26ea
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 138 deletions.
11 changes: 0 additions & 11 deletions capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,8 @@ use crate::token::InvalidTokenReason;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use thiserror::Error;

#[derive(Debug, Deserialize, Serialize)]
pub struct CaptureRequest {
#[serde(alias = "$token", alias = "api_key")]
pub token: String,

pub event: String,
pub properties: HashMap<String, Value>,
}

#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum CaptureResponseCode {
Ok = 1,
Expand Down
124 changes: 18 additions & 106 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Arc;

Expand All @@ -15,10 +14,9 @@ use metrics::counter;
use time::OffsetDateTime;
use tracing::instrument;

use crate::event::{Compression, ProcessingContext};
use crate::event::{Compression, ProcessingContext, RawRequest};
use crate::limiters::billing::QuotaResource;
use crate::prometheus::report_dropped_events;
use crate::token::validate_token;
use crate::{
api::{CaptureError, CaptureResponse, CaptureResponseCode},
event::{EventFormData, EventQuery, ProcessedEvent, RawEvent},
Expand All @@ -35,7 +33,8 @@ use crate::{
content_encoding,
content_type,
version,
compression
compression,
is_historical
)
)]
#[debug_handler]
Expand Down Expand Up @@ -69,7 +68,7 @@ pub async fn event(
tracing::Span::current().record("compression", comp.as_str());
tracing::Span::current().record("method", method.as_str());

let events = match headers
let request = match headers
.get("content-type")
.map_or("", |v| v.to_str().unwrap_or(""))
{
Expand All @@ -86,28 +85,33 @@ pub async fn event(
tracing::error!("failed to decode form data: {}", e);
CaptureError::RequestDecodingError(String::from("missing data field"))
})?;
RawEvent::from_bytes(payload.into())
RawRequest::from_bytes(payload.into())
}
ct => {
tracing::Span::current().record("content_type", ct);

RawEvent::from_bytes(body)
RawRequest::from_bytes(body)
}
}?;

let token = match request.extract_and_verify_token() {
Ok(token) => token,
Err(err) => {
report_dropped_events("token_shape_invalid", request.events().len() as u64);
return Err(err)
}
};
let is_historical = request.is_historical();
let events = request.events(); // Takes ownership of request

tracing::Span::current().record("token", &token);
tracing::Span::current().record("is_historical", is_historical);
tracing::Span::current().record("batch_size", events.len());

if events.is_empty() {
return Err(CaptureError::EmptyBatch);
}

let token = extract_and_verify_token(&events).map_err(|err| {
report_dropped_events("token_shape_invalid", events.len() as u64);
err
})?;

tracing::Span::current().record("token", &token);

counter!("capture_events_received_total").increment(events.len() as u64);

let sent_at = meta.sent_at.and_then(|value| {
Expand Down Expand Up @@ -192,28 +196,6 @@ pub fn process_single_event(
})
}

#[instrument(skip_all, fields(events = events.len()))]
pub fn extract_and_verify_token(events: &[RawEvent]) -> Result<String, CaptureError> {
let distinct_tokens: HashSet<Option<String>> = HashSet::from_iter(
events
.iter()
.map(RawEvent::extract_token)
.filter(Option::is_some),
);

return match distinct_tokens.len() {
0 => Err(CaptureError::NoTokenError),
1 => match distinct_tokens.iter().last() {
Some(Some(token)) => {
validate_token(token)?;
Ok(token.clone())
}
_ => Err(CaptureError::NoTokenError),
},
_ => Err(CaptureError::MultipleTokensError),
};
}

#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_events<'a>(
sink: Arc<dyn sinks::Event + Send + Sync>,
Expand All @@ -233,73 +215,3 @@ pub async fn process_events<'a>(
sink.send_batch(events).await
}
}

#[cfg(test)]
mod tests {
use crate::capture::extract_and_verify_token;
use crate::event::RawEvent;
use serde_json::json;
use std::collections::HashMap;

#[tokio::test]
async fn all_events_have_same_token() {
let events = vec![
RawEvent {
token: Some(String::from("hello")),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::new(),
timestamp: None,
offset: None,
set: Default::default(),
set_once: Default::default(),
},
RawEvent {
token: None,
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("hello"))]),
timestamp: None,
offset: None,
set: Default::default(),
set_once: Default::default(),
},
];

let processed = extract_and_verify_token(&events);
assert_eq!(processed.is_ok(), true, "{:?}", processed);
}

#[tokio::test]
async fn all_events_have_different_token() {
let events = vec![
RawEvent {
token: Some(String::from("hello")),
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::new(),
timestamp: None,
offset: None,
set: Default::default(),
set_once: Default::default(),
},
RawEvent {
token: None,
distinct_id: Some(json!("testing")),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("goodbye"))]),
timestamp: None,
offset: None,
set: Default::default(),
set_once: Default::default(),
},
];

let processed = extract_and_verify_token(&events);
assert_eq!(processed.is_err(), true);
}
}
Loading

0 comments on commit 94b26ea

Please sign in to comment.