Skip to content

Commit

Permalink
feat: add app_metrics2 reporting to hoghooks (#23931)
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Aug 1, 2024
1 parent c6db8e6 commit 5199105
Show file tree
Hide file tree
Showing 10 changed files with 653 additions and 140 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 32 additions & 15 deletions rust/hook-api/src/handlers/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ pub async fn post_hoghook(

debug!("received payload: {:?}", payload);

// We use these fields for metrics in the janitor, but we don't actually need to do anything
// with them now.
payload
.get("teamId")
.ok_or_else(|| bad_request("missing required field 'teamId'".to_owned()))?
.as_number()
.ok_or_else(|| bad_request("'teamId' is not a number".to_owned()))?;
payload
.get("hogFunctionId")
.ok_or_else(|| bad_request("missing required field 'hogFunctionId'".to_owned()))?
.as_str()
.ok_or_else(|| bad_request("'hogFunctionId' is not a string".to_owned()))?;

// We deserialize a copy of the `asyncFunctionRequest` field here because we want to leave
// the original payload unmodified so that it can be passed through exactly as it came to us.
let async_function_request = payload
Expand Down Expand Up @@ -413,28 +426,28 @@ mod tests {

let valid_payloads = vec![
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"body": "", "headers": {}, "method": "POST", "url": "http://example.com"}"#,
),
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET"}]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET"}]}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"body": "", "headers": {}, "method": "GET", "url": "http://example.com"}"#,
),
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"body": "hello, world"}]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"body": "hello, world"}]}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"body": "hello, world", "headers": {}, "method": "POST", "url": "http://example.com"}"#,
),
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"headers": {"k": "v"}}]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"headers": {"k": "v"}}]}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"body": "", "headers": {"k": "v"}, "method": "POST", "url": "http://example.com"}"#,
),
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET", "body": "hello, world", "headers": {"k": "v"}}]}, "otherField": true}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET", "body": "hello, world", "headers": {"k": "v"}}]}, "otherField": true, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"body": "hello, world", "headers": {"k": "v"}, "method": "GET", "url": "http://example.com"}"#,
),
// Test that null unicode code points are replaced, since they aren't allowed in Postgres.
(
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com/\\u0000", {"method": "GET", "body": "\\u0000", "headers": {"k": "v"}}]}, "otherField": true}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com/\\u0000", {"method": "GET", "body": "\\u0000", "headers": {"k": "v"}}]}, "otherField": true, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"body": "\\uFFFD", "headers": {"k": "v"}, "method": "GET", "url": "http://example.com/\\uFFFD"}"#,
),
];
Expand Down Expand Up @@ -496,18 +509,22 @@ mod tests {

let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE);

let valid_payloads = vec![
let invalid_payloads = vec![
r#"{}"#,
r#"{"asyncFunctionRequest":{}"#,
r#"{"asyncFunctionRequest":{"name":"not-fetch","args":[]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch"}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":{}}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":[]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["not-url"]}}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "not-method"}]}}"#,
r#"{"asyncFunctionRequest":{"teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"not-fetch","args":[]}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"fetch"}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":{}}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":[]}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["not-url"]}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "not-method"}]}, "teamId": 1, "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "teamId": "string", "hogFunctionId": "abc"}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "teamId": 1}"#,
r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "teamId": 1, "hogFunctionId": 1}"#,
];

for payload in valid_payloads {
for payload in invalid_payloads {
let mut headers = collections::HashMap::new();
headers.insert("Content-Type".to_owned(), "application/json".to_owned());
let response = app
Expand Down
82 changes: 28 additions & 54 deletions rust/hook-common/src/kafka_messages/app_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum AppMetricCategory {
// names need to remain stable, or new variants need to be deployed to the cleanup/janitor
// process before they are used.
#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)]
#[serde(try_from = "String", into = "String")]
pub enum ErrorType {
TimeoutError,
ConnectionError,
Expand Down Expand Up @@ -64,12 +65,7 @@ pub struct AppMetric {
pub failures: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_uuid: Option<Uuid>,
#[serde(
serialize_with = "serialize_error_type",
deserialize_with = "deserialize_error_type",
default,
skip_serializing_if = "Option::is_none"
)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error_type: Option<ErrorType>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_details: Option<ErrorDetails>,
Expand Down Expand Up @@ -118,57 +114,35 @@ where
Ok(category)
}

fn serialize_error_type<S>(error_type: &Option<ErrorType>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let error_type = match error_type {
Some(error_type) => error_type,
None => return serializer.serialize_none(),
};

let error_type = match error_type {
ErrorType::ConnectionError => "Connection Error".to_owned(),
ErrorType::TimeoutError => "Timeout Error".to_owned(),
ErrorType::BadHttpStatus(s) => format!("Bad HTTP Status: {}", s),
ErrorType::ParseError => "Parse Error".to_owned(),
};
serializer.serialize_str(&error_type)
impl TryFrom<String> for ErrorType {
type Error = String;

fn try_from(s: String) -> Result<Self, Self::Error> {
match s.as_str() {
"Connection Error" | "ConnectionError" => Ok(ErrorType::ConnectionError),
"Timeout Error" | "TimeoutError" => Ok(ErrorType::TimeoutError),
s if s.starts_with("Bad HTTP Status:") => {
let status = &s["Bad HTTP Status:".len()..].trim();
let parsed_status = status
.parse::<u16>()
.map_err(|e| format!("Failed to parse HTTP status: {}", e))?;
Ok(ErrorType::BadHttpStatus(parsed_status))
}
"Parse Error" | "ParseError" => Ok(ErrorType::ParseError),
_ => Err(format!("Unknown ErrorType: {}", s)),
}
}
}

fn deserialize_error_type<'de, D>(deserializer: D) -> Result<Option<ErrorType>, D::Error>
where
D: Deserializer<'de>,
{
let opt = Option::<String>::deserialize(deserializer)?;
let error_type = match opt {
Some(s) => {
let error_type = match &s[..] {
"Connection Error" => ErrorType::ConnectionError,
"Timeout Error" => ErrorType::TimeoutError,
_ if s.starts_with("Bad HTTP Status:") => {
let status = &s["Bad HTTP Status:".len()..];
ErrorType::BadHttpStatus(status.parse().map_err(serde::de::Error::custom)?)
}
"Parse Error" => ErrorType::ParseError,
_ => {
return Err(serde::de::Error::unknown_variant(
&s,
&[
"Connection Error",
"Timeout Error",
"Bad HTTP Status: <status>",
"Parse Error",
],
))
}
};
Some(error_type)
impl From<ErrorType> for String {
fn from(error: ErrorType) -> Self {
match error {
ErrorType::ConnectionError => "Connection Error".to_string(),
ErrorType::TimeoutError => "Timeout Error".to_string(),
ErrorType::BadHttpStatus(s) => format!("Bad HTTP Status: {}", s),
ErrorType::ParseError => "Parse Error".to_string(),
}
None => None,
};

Ok(error_type)
}
}

#[cfg(test)]
Expand Down
61 changes: 61 additions & 0 deletions rust/hook-common/src/kafka_messages/app_metrics2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use super::{deserialize_datetime, serialize_datetime};

#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "lowercase")]
pub enum Source {
Hoghooks,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "lowercase")]
pub enum Kind {
Success,
Failure,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)]
pub struct AppMetric2 {
pub team_id: u32,
#[serde(
serialize_with = "serialize_datetime",
deserialize_with = "deserialize_datetime"
)]
pub timestamp: DateTime<Utc>,
pub app_source: Source,
pub app_source_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub instance_id: Option<String>,
pub metric_kind: Kind,
pub metric_name: String,
pub count: u32,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_app_metric2_serialization() {
use chrono::prelude::*;

let app_metric = AppMetric2 {
team_id: 123,
timestamp: Utc.with_ymd_and_hms(2023, 12, 14, 12, 2, 0).unwrap(),
app_source: Source::Hoghooks,
app_source_id: "hog-function-1".to_owned(),
instance_id: Some("hash".to_owned()),
metric_kind: Kind::Success,
metric_name: "fetch".to_owned(),
count: 456,
};

let serialized_json = serde_json::to_string(&app_metric).unwrap();

let expected_json = r#"{"team_id":123,"timestamp":"2023-12-14 12:02:00","app_source":"hoghooks","app_source_id":"hog-function-1","instance_id":"hash","metric_kind":"success","metric_name":"fetch","count":456}"#;

assert_eq!(serialized_json, expected_json);
}
}
1 change: 1 addition & 0 deletions rust/hook-common/src/kafka_messages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod app_metrics;
pub mod app_metrics2;
pub mod plugin_logs;

use chrono::{DateTime, NaiveDateTime, Utc};
Expand Down
1 change: 1 addition & 0 deletions rust/hook-janitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ health = { path = "../common/health" }
hook-common = { path = "../hook-common" }
metrics = { workspace = true }
rdkafka = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions rust/hook-janitor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub struct Config {
#[envconfig(default = "clickhouse_app_metrics")]
pub app_metrics_topic: String,

#[envconfig(default = "clickhouse_app_metrics2")]
pub app_metrics2_topic: String,

#[envconfig(nested = true)]
pub kafka: KafkaConfig,
}
Expand Down
Loading

0 comments on commit 5199105

Please sign in to comment.