diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 15c2210f61fb5..e60f08eb9874a 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1522,6 +1522,7 @@ dependencies = [ "hook-common", "metrics", "rdkafka", + "serde", "serde_json", "sqlx", "thiserror", diff --git a/rust/hook-api/src/handlers/webhook.rs b/rust/hook-api/src/handlers/webhook.rs index a3e186ba710da..3f4cd8c8a5b8a 100644 --- a/rust/hook-api/src/handlers/webhook.rs +++ b/rust/hook-api/src/handlers/webhook.rs @@ -121,6 +121,19 @@ pub async fn post_hoghook( debug!("received payload: {:?}", payload); + // We use these fields for metrics in the janitor, but we don't actually need to do anything + // with them now. + payload + .get("teamId") + .ok_or_else(|| bad_request("missing required field 'teamId'".to_owned()))? + .as_number() + .ok_or_else(|| bad_request("'teamId' is not a number".to_owned()))?; + payload + .get("hogFunctionId") + .ok_or_else(|| bad_request("missing required field 'hogFunctionId'".to_owned()))? + .as_str() + .ok_or_else(|| bad_request("'hogFunctionId' is not a string".to_owned()))?; + // We deserialize a copy of the `asyncFunctionRequest` field here because we want to leave // the original payload unmodified so that it can be passed through exactly as it came to us. let async_function_request = payload @@ -413,28 +426,28 @@ mod tests { let valid_payloads = vec![ ( - r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "teamId": 1, "hogFunctionId": "abc"}"#, r#"{"body": "", "headers": {}, "method": "POST", "url": "http://example.com"}"#, ), ( - r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET"}]}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET"}]}, "teamId": 1, "hogFunctionId": "abc"}"#, r#"{"body": "", "headers": {}, "method": "GET", "url": "http://example.com"}"#, ), ( - r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"body": "hello, world"}]}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"body": "hello, world"}]}, "teamId": 1, "hogFunctionId": "abc"}"#, r#"{"body": "hello, world", "headers": {}, "method": "POST", "url": "http://example.com"}"#, ), ( - r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"headers": {"k": "v"}}]}}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"headers": {"k": "v"}}]}, "teamId": 1, "hogFunctionId": "abc"}"#, r#"{"body": "", "headers": {"k": "v"}, "method": "POST", "url": "http://example.com"}"#, ), ( - r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET", "body": "hello, world", "headers": {"k": "v"}}]}, "otherField": true}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "GET", "body": "hello, world", "headers": {"k": "v"}}]}, "otherField": true, "teamId": 1, "hogFunctionId": "abc"}"#, r#"{"body": "hello, world", "headers": {"k": "v"}, "method": "GET", "url": "http://example.com"}"#, ), // Test that null unicode code points are replaced, since they aren't allowed in Postgres. ( - r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com/\\u0000", {"method": "GET", "body": "\\u0000", "headers": {"k": "v"}}]}, "otherField": true}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com/\\u0000", {"method": "GET", "body": "\\u0000", "headers": {"k": "v"}}]}, "otherField": true, "teamId": 1, "hogFunctionId": "abc"}"#, r#"{"body": "\\uFFFD", "headers": {"k": "v"}, "method": "GET", "url": "http://example.com/\\uFFFD"}"#, ), ]; @@ -496,18 +509,22 @@ mod tests { let app = add_routes(Router::new(), pg_queue, hog_mode, MAX_BODY_SIZE); - let valid_payloads = vec![ + let invalid_payloads = vec![ r#"{}"#, - r#"{"asyncFunctionRequest":{}"#, - r#"{"asyncFunctionRequest":{"name":"not-fetch","args":[]}}"#, - r#"{"asyncFunctionRequest":{"name":"fetch"}}"#, - r#"{"asyncFunctionRequest":{"name":"fetch","args":{}}}"#, - r#"{"asyncFunctionRequest":{"name":"fetch","args":[]}}"#, - r#"{"asyncFunctionRequest":{"name":"fetch","args":["not-url"]}}"#, - r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "not-method"}]}}"#, + r#"{"asyncFunctionRequest":{"teamId": 1, "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"not-fetch","args":[]}, "teamId": 1, "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"fetch"}, "teamId": 1, "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":{}}, "teamId": 1, "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":[]}, "teamId": 1, "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["not-url"]}, "teamId": 1, "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com", {"method": "not-method"}]}, "teamId": 1, "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "teamId": "string", "hogFunctionId": "abc"}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "teamId": 1}"#, + r#"{"asyncFunctionRequest":{"name":"fetch","args":["http://example.com"]}, "teamId": 1, "hogFunctionId": 1}"#, ]; - for payload in valid_payloads { + for payload in invalid_payloads { let mut headers = collections::HashMap::new(); headers.insert("Content-Type".to_owned(), "application/json".to_owned()); let response = app diff --git a/rust/hook-common/src/kafka_messages/app_metrics.rs b/rust/hook-common/src/kafka_messages/app_metrics.rs index f941f58138b76..7cb75617f0d21 100644 --- a/rust/hook-common/src/kafka_messages/app_metrics.rs +++ b/rust/hook-common/src/kafka_messages/app_metrics.rs @@ -17,6 +17,7 @@ pub enum AppMetricCategory { // names need to remain stable, or new variants need to be deployed to the cleanup/janitor // process before they are used. #[derive(Deserialize, Serialize, Debug, PartialEq, Clone)] +#[serde(try_from = "String", into = "String")] pub enum ErrorType { TimeoutError, ConnectionError, @@ -64,12 +65,7 @@ pub struct AppMetric { pub failures: u32, #[serde(skip_serializing_if = "Option::is_none")] pub error_uuid: Option, - #[serde( - serialize_with = "serialize_error_type", - deserialize_with = "deserialize_error_type", - default, - skip_serializing_if = "Option::is_none" - )] + #[serde(default, skip_serializing_if = "Option::is_none")] pub error_type: Option, #[serde(skip_serializing_if = "Option::is_none")] pub error_details: Option, @@ -118,57 +114,35 @@ where Ok(category) } -fn serialize_error_type(error_type: &Option, serializer: S) -> Result -where - S: Serializer, -{ - let error_type = match error_type { - Some(error_type) => error_type, - None => return serializer.serialize_none(), - }; - - let error_type = match error_type { - ErrorType::ConnectionError => "Connection Error".to_owned(), - ErrorType::TimeoutError => "Timeout Error".to_owned(), - ErrorType::BadHttpStatus(s) => format!("Bad HTTP Status: {}", s), - ErrorType::ParseError => "Parse Error".to_owned(), - }; - serializer.serialize_str(&error_type) +impl TryFrom for ErrorType { + type Error = String; + + fn try_from(s: String) -> Result { + match s.as_str() { + "Connection Error" | "ConnectionError" => Ok(ErrorType::ConnectionError), + "Timeout Error" | "TimeoutError" => Ok(ErrorType::TimeoutError), + s if s.starts_with("Bad HTTP Status:") => { + let status = &s["Bad HTTP Status:".len()..].trim(); + let parsed_status = status + .parse::() + .map_err(|e| format!("Failed to parse HTTP status: {}", e))?; + Ok(ErrorType::BadHttpStatus(parsed_status)) + } + "Parse Error" | "ParseError" => Ok(ErrorType::ParseError), + _ => Err(format!("Unknown ErrorType: {}", s)), + } + } } -fn deserialize_error_type<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let opt = Option::::deserialize(deserializer)?; - let error_type = match opt { - Some(s) => { - let error_type = match &s[..] { - "Connection Error" => ErrorType::ConnectionError, - "Timeout Error" => ErrorType::TimeoutError, - _ if s.starts_with("Bad HTTP Status:") => { - let status = &s["Bad HTTP Status:".len()..]; - ErrorType::BadHttpStatus(status.parse().map_err(serde::de::Error::custom)?) - } - "Parse Error" => ErrorType::ParseError, - _ => { - return Err(serde::de::Error::unknown_variant( - &s, - &[ - "Connection Error", - "Timeout Error", - "Bad HTTP Status: ", - "Parse Error", - ], - )) - } - }; - Some(error_type) +impl From for String { + fn from(error: ErrorType) -> Self { + match error { + ErrorType::ConnectionError => "Connection Error".to_string(), + ErrorType::TimeoutError => "Timeout Error".to_string(), + ErrorType::BadHttpStatus(s) => format!("Bad HTTP Status: {}", s), + ErrorType::ParseError => "Parse Error".to_string(), } - None => None, - }; - - Ok(error_type) + } } #[cfg(test)] diff --git a/rust/hook-common/src/kafka_messages/app_metrics2.rs b/rust/hook-common/src/kafka_messages/app_metrics2.rs new file mode 100644 index 0000000000000..3c2510fb010c0 --- /dev/null +++ b/rust/hook-common/src/kafka_messages/app_metrics2.rs @@ -0,0 +1,61 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use super::{deserialize_datetime, serialize_datetime}; + +#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "lowercase")] +pub enum Source { + Hoghooks, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "lowercase")] +pub enum Kind { + Success, + Failure, +} + +#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)] +pub struct AppMetric2 { + pub team_id: u32, + #[serde( + serialize_with = "serialize_datetime", + deserialize_with = "deserialize_datetime" + )] + pub timestamp: DateTime, + pub app_source: Source, + pub app_source_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub instance_id: Option, + pub metric_kind: Kind, + pub metric_name: String, + pub count: u32, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_app_metric2_serialization() { + use chrono::prelude::*; + + let app_metric = AppMetric2 { + team_id: 123, + timestamp: Utc.with_ymd_and_hms(2023, 12, 14, 12, 2, 0).unwrap(), + app_source: Source::Hoghooks, + app_source_id: "hog-function-1".to_owned(), + instance_id: Some("hash".to_owned()), + metric_kind: Kind::Success, + metric_name: "fetch".to_owned(), + count: 456, + }; + + let serialized_json = serde_json::to_string(&app_metric).unwrap(); + + let expected_json = r#"{"team_id":123,"timestamp":"2023-12-14 12:02:00","app_source":"hoghooks","app_source_id":"hog-function-1","instance_id":"hash","metric_kind":"success","metric_name":"fetch","count":456}"#; + + assert_eq!(serialized_json, expected_json); + } +} diff --git a/rust/hook-common/src/kafka_messages/mod.rs b/rust/hook-common/src/kafka_messages/mod.rs index f548563af5ba1..92b9c605956e0 100644 --- a/rust/hook-common/src/kafka_messages/mod.rs +++ b/rust/hook-common/src/kafka_messages/mod.rs @@ -1,4 +1,5 @@ pub mod app_metrics; +pub mod app_metrics2; pub mod plugin_logs; use chrono::{DateTime, NaiveDateTime, Utc}; diff --git a/rust/hook-janitor/Cargo.toml b/rust/hook-janitor/Cargo.toml index 741918e79385a..21894a38f8013 100644 --- a/rust/hook-janitor/Cargo.toml +++ b/rust/hook-janitor/Cargo.toml @@ -16,6 +16,7 @@ health = { path = "../common/health" } hook-common = { path = "../hook-common" } metrics = { workspace = true } rdkafka = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } sqlx = { workspace = true } thiserror = { workspace = true } diff --git a/rust/hook-janitor/src/config.rs b/rust/hook-janitor/src/config.rs index 166fb8067a056..28c34488d476b 100644 --- a/rust/hook-janitor/src/config.rs +++ b/rust/hook-janitor/src/config.rs @@ -28,6 +28,9 @@ pub struct Config { #[envconfig(default = "clickhouse_app_metrics")] pub app_metrics_topic: String, + #[envconfig(default = "clickhouse_app_metrics2")] + pub app_metrics2_topic: String, + #[envconfig(nested = true)] pub kafka: KafkaConfig, } diff --git a/rust/hook-janitor/src/fixtures/hoghook_cleanup.sql b/rust/hook-janitor/src/fixtures/hoghook_cleanup.sql new file mode 100644 index 0000000000000..482b994e6310a --- /dev/null +++ b/rust/hook-janitor/src/fixtures/hoghook_cleanup.sql @@ -0,0 +1,166 @@ +INSERT INTO + job_queue ( + errors, + metadata, + attempted_at, + last_attempt_finished_at, + parameters, + queue, + status, + target + ) +VALUES + -- team:1, hogFunctionId:2, completed in hour 20 + ( + NULL, + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'hoghooks', + 'completed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, completed in hour 20 (purposeful duplicate) + ( + NULL, + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'hoghooks', + 'completed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, completed in hour 21 (different hour) + ( + NULL, + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 21:01:18.799371+00', + '2023-12-19 21:01:18.799371+00', + '{}', + 'hoghooks', + 'completed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:3, completed in hour 20 (different hogFunctionId) + ( + NULL, + '{"teamId": 1, "hogFunctionId": "3"}', + '2023-12-19 20:01:18.80335+00', + '2023-12-19 20:01:18.80335+00', + '{}', + 'hoghooks', + 'completed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, completed but in a different queue + ( + NULL, + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'not-hoghooks', + 'completed', + 'https://myhost/endpoint' + ), + -- team:2, hogFunctionId:4, completed in hour 20 (different team) + ( + NULL, + '{"teamId": 2, "hogFunctionId": "4"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'hoghooks', + 'completed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, failed in hour 20 + ( + ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'hoghooks', + 'failed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, failed in hour 20 (purposeful duplicate) + ( + ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'hoghooks', + 'failed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, failed in hour 20 (different error) + ( + ARRAY ['{"type":"ConnectionError","details":{"error":{"name":"Connection Error"}}}'::jsonb], + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'hoghooks', + 'failed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, failed in hour 21 (different hour) + ( + ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 21:01:18.799371+00', + '2023-12-19 21:01:18.799371+00', + '{}', + 'hoghooks', + 'failed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:3, failed in hour 20 (different hogFunctionId) + ( + ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], + '{"teamId": 1, "hogFunctionId": "3"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'hoghooks', + 'failed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, failed but in a different queue + ( + ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'not-hoghooks', + 'failed', + 'https://myhost/endpoint' + ), + -- team:2, hogFunctionId:4, failed in hour 20 (purposeful duplicate) + ( + ARRAY ['{"type":"TimeoutError","details":{"error":{"name":"Timeout"}}}'::jsonb], + '{"teamId": 2, "hogFunctionId": "4"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{}', + 'hoghooks', + 'failed', + 'https://myhost/endpoint' + ), + -- team:1, hogFunctionId:2, available + ( + NULL, + '{"teamId": 1, "hogFunctionId": "2"}', + '2023-12-19 20:01:18.799371+00', + '2023-12-19 20:01:18.799371+00', + '{"body": "hello world", "headers": {}, "method": "POST", "url": "https://myhost/endpoint"}', + 'hoghooks', + 'available', + 'https://myhost/endpoint' + ); \ No newline at end of file diff --git a/rust/hook-janitor/src/main.rs b/rust/hook-janitor/src/main.rs index 891532d35dae0..200e5a0030562 100644 --- a/rust/hook-janitor/src/main.rs +++ b/rust/hook-janitor/src/main.rs @@ -63,6 +63,7 @@ async fn main() { &config.database_url, kafka_producer, config.app_metrics_topic.to_owned(), + config.app_metrics2_topic.to_owned(), config.hog_mode, ) .expect("unable to create webhook cleaner"), diff --git a/rust/hook-janitor/src/webhooks.rs b/rust/hook-janitor/src/webhooks.rs index 67d6550fe8a19..c40c7441c5b48 100644 --- a/rust/hook-janitor/src/webhooks.rs +++ b/rust/hook-janitor/src/webhooks.rs @@ -7,6 +7,7 @@ use futures::future::join_all; use hook_common::webhook::WebhookJobError; use rdkafka::error::KafkaError; use rdkafka::producer::{FutureProducer, FutureRecord}; +use serde::Serialize; use serde_json::error::Error as SerdeError; use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions, Postgres}; use sqlx::types::{chrono, Uuid}; @@ -17,6 +18,7 @@ use tracing::{debug, error, info}; use crate::cleanup::Cleaner; use hook_common::kafka_messages::app_metrics::{AppMetric, AppMetricCategory}; +use hook_common::kafka_messages::app_metrics2::{self, AppMetric2}; use hook_common::kafka_producer::KafkaContext; use hook_common::metrics::get_current_timestamp_seconds; @@ -58,6 +60,7 @@ pub struct WebhookCleaner { pg_pool: PgPool, kafka_producer: FutureProducer, app_metrics_topic: String, + app_metrics2_topic: String, hog_mode: bool, } @@ -113,14 +116,6 @@ struct FailedRow { failures: u32, } -#[derive(sqlx::FromRow, Debug)] -struct QueueDepth { - oldest_scheduled_at_untried: DateTime, - count_untried: i64, - oldest_scheduled_at_retries: DateTime, - count_retries: i64, -} - impl From for AppMetric { fn from(row: FailedRow) -> Self { AppMetric { @@ -139,6 +134,128 @@ impl From for AppMetric { } } +#[derive(sqlx::FromRow, Debug)] +struct HoghookCompletedRow { + // App Metrics truncates/aggregates rows on the hour, so we take advantage of that to GROUP BY + // and aggregate to select fewer rows. + hour: DateTime, + // A note about the `try_from`s: Postgres returns all of those types as `bigint` (i64), but + // we know their true sizes, and so we can convert them to the correct types here. If this + // ever fails then something has gone wrong. + #[sqlx(try_from = "i64")] + team_id: u32, + app_source_id: String, + #[sqlx(try_from = "i64")] + count: u32, +} + +impl From for AppMetric2 { + fn from(row: HoghookCompletedRow) -> Self { + AppMetric2 { + team_id: row.team_id, + timestamp: row.hour, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: row.app_source_id, + instance_id: None, + metric_kind: app_metrics2::Kind::Success, + metric_name: "Fetch".to_owned(), + count: row.count, + } + } +} + +#[derive(sqlx::FromRow, Debug)] +struct HoghookFailedRow { + // App Metrics truncates/aggregates rows on the hour, so we take advantage of that to GROUP BY + // and aggregate to select fewer rows. + hour: DateTime, + // A note about the `try_from`s: Postgres returns all of those types as `bigint` (i64), but + // we know their true sizes, and so we can convert them to the correct types here. If this + // ever fails then something has gone wrong. + #[sqlx(try_from = "i64")] + team_id: u32, + app_source_id: String, + #[sqlx(json)] + last_error: WebhookJobError, + #[sqlx(try_from = "i64")] + count: u32, +} + +impl From for AppMetric2 { + fn from(row: HoghookFailedRow) -> Self { + AppMetric2 { + team_id: row.team_id, + timestamp: row.hour, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: row.app_source_id, + instance_id: None, + metric_kind: app_metrics2::Kind::Failure, + metric_name: String::from(row.last_error.r#type), + count: row.count, + } + } +} + +#[derive(sqlx::FromRow, Debug)] +struct QueueDepth { + oldest_scheduled_at_untried: DateTime, + count_untried: i64, + oldest_scheduled_at_retries: DateTime, + count_retries: i64, +} + +// TODO: Extract this to a more generic function that produces any iterable that can be +// serialized, and returns more generic errors. +async fn send_metrics_to_kafka( + kafka_producer: &FutureProducer, + topic: &str, + metrics: impl IntoIterator, +) -> Result<()> +where + T: Serialize, +{ + let mut payloads = Vec::new(); + + for metric in metrics { + let payload = serde_json::to_string(&metric) + .map_err(|e| WebhookCleanerError::SerializeRowsError { error: e })?; + payloads.push(payload); + } + + if payloads.is_empty() { + return Ok(()); + } + + let mut delivery_futures = Vec::new(); + + for payload in payloads { + match kafka_producer.send_result(FutureRecord { + topic, + payload: Some(&payload), + partition: None, + key: None::<&str>, + timestamp: None, + headers: None, + }) { + Ok(future) => delivery_futures.push(future), + Err((error, _)) => return Err(WebhookCleanerError::KafkaProduceError { error }), + } + } + + for result in join_all(delivery_futures).await { + match result { + Ok(Ok(_)) => {} + Ok(Err((error, _))) => return Err(WebhookCleanerError::KafkaProduceError { error }), + Err(_) => { + // Cancelled due to timeout while retrying + return Err(WebhookCleanerError::KafkaProduceCanceled); + } + } + } + + Ok(()) +} + // A simple wrapper type that ensures we don't use any old Transaction object when we need one // that has set the isolation level to serializable. struct SerializableTxn<'a>(Transaction<'a, Postgres>); @@ -156,6 +273,7 @@ impl WebhookCleaner { database_url: &str, kafka_producer: FutureProducer, app_metrics_topic: String, + app_metrics2_topic: String, hog_mode: bool, ) -> Result { let options = PgConnectOptions::from_str(database_url) @@ -169,6 +287,7 @@ impl WebhookCleaner { pg_pool, kafka_producer, app_metrics_topic, + app_metrics2_topic, hog_mode, }) } @@ -178,12 +297,14 @@ impl WebhookCleaner { pg_pool: PgPool, kafka_producer: FutureProducer, app_metrics_topic: String, + app_metrics2_topic: String, hog_mode: bool, ) -> Result { Ok(Self { pg_pool, kafka_producer, app_metrics_topic, + app_metrics2_topic, hog_mode, }) } @@ -277,6 +398,29 @@ impl WebhookCleaner { Ok(rows) } + async fn get_completed_agg_rows_for_hoghooks( + &self, + tx: &mut SerializableTxn<'_>, + ) -> Result> { + let base_query = r#" + SELECT DATE_TRUNC('hour', last_attempt_finished_at) AS hour, + (metadata->>'teamId')::bigint AS team_id, + (metadata->>'hogFunctionId') AS app_source_id, + count(*) as count + FROM job_queue + WHERE status = 'completed' + GROUP BY hour, team_id, app_source_id + ORDER BY hour, team_id, app_source_id; + "#; + + let rows = sqlx::query_as::<_, HoghookCompletedRow>(base_query) + .fetch_all(&mut *tx.0) + .await + .map_err(|e| WebhookCleanerError::GetCompletedRowsError { error: e })?; + + Ok(rows) + } + async fn get_failed_agg_rows(&self, tx: &mut SerializableTxn<'_>) -> Result> { let base_query = r#" SELECT DATE_TRUNC('hour', last_attempt_finished_at) AS hour, @@ -298,47 +442,28 @@ impl WebhookCleaner { Ok(rows) } - async fn send_metrics_to_kafka(&self, metrics: Vec) -> Result<()> { - if metrics.is_empty() { - return Ok(()); - } - - let payloads: Vec = metrics - .into_iter() - .map(|metric| serde_json::to_string(&metric)) - .collect::, SerdeError>>() - .map_err(|e| WebhookCleanerError::SerializeRowsError { error: e })?; - - let mut delivery_futures = Vec::new(); - - for payload in payloads { - match self.kafka_producer.send_result(FutureRecord { - topic: self.app_metrics_topic.as_str(), - payload: Some(&payload), - partition: None, - key: None::<&str>, - timestamp: None, - headers: None, - }) { - Ok(future) => delivery_futures.push(future), - Err((error, _)) => return Err(WebhookCleanerError::KafkaProduceError { error }), - } - } + async fn get_failed_agg_rows_for_hoghooks( + &self, + tx: &mut SerializableTxn<'_>, + ) -> Result> { + let base_query = r#" + SELECT DATE_TRUNC('hour', last_attempt_finished_at) AS hour, + (metadata->>'teamId')::bigint AS team_id, + (metadata->>'hogFunctionId') AS app_source_id, + errors[array_upper(errors, 1)] AS last_error, + count(*) as count + FROM job_queue + WHERE status = 'failed' + GROUP BY hour, team_id, app_source_id, last_error + ORDER BY hour, team_id, app_source_id, last_error; + "#; - for result in join_all(delivery_futures).await { - match result { - Ok(Ok(_)) => {} - Ok(Err((error, _))) => { - return Err(WebhookCleanerError::KafkaProduceError { error }) - } - Err(_) => { - // Cancelled due to timeout while retrying - return Err(WebhookCleanerError::KafkaProduceCanceled); - } - } - } + let rows = sqlx::query_as::<_, HoghookFailedRow>(base_query) + .fetch_all(&mut *tx.0) + .await + .map_err(|e| WebhookCleanerError::GetFailedRowsError { error: e })?; - Ok(()) + Ok(rows) } async fn delete_observed_rows(&self, tx: &mut SerializableTxn<'_>) -> Result { @@ -397,36 +522,58 @@ impl WebhookCleaner { let mut tx = self.start_serializable_txn().await?; - let (completed_row_count, completed_agg_row_count) = { - let completed_row_count = self.get_row_count_for_status(&mut tx, "completed").await?; - let completed_agg_rows = if self.hog_mode { - // Hog mode doesn't need to send metrics to Kafka (and can't aggregate by - // plugin anyway), so we can skip this. - vec![] - } else { - self.get_completed_agg_rows(&mut tx).await? - }; - let agg_row_count = completed_agg_rows.len() as u64; + let completed_row_count = self.get_row_count_for_status(&mut tx, "completed").await?; + let completed_agg_row_count = if self.hog_mode { + let completed_agg_rows = self.get_completed_agg_rows_for_hoghooks(&mut tx).await?; + let completed_agg_row_count = completed_agg_rows.len() as u64; + let completed_app_metrics: Vec = + completed_agg_rows.into_iter().map(Into::into).collect(); + send_metrics_to_kafka( + &self.kafka_producer, + &self.app_metrics2_topic, + completed_app_metrics, + ) + .await?; + completed_agg_row_count + } else { + let completed_agg_rows = self.get_completed_agg_rows(&mut tx).await?; + let completed_agg_row_count = completed_agg_rows.len() as u64; let completed_app_metrics: Vec = completed_agg_rows.into_iter().map(Into::into).collect(); - self.send_metrics_to_kafka(completed_app_metrics).await?; - (completed_row_count, agg_row_count) + send_metrics_to_kafka( + &self.kafka_producer, + &self.app_metrics_topic, + completed_app_metrics, + ) + .await?; + completed_agg_row_count }; - let (failed_row_count, failed_agg_row_count) = { - let failed_row_count = self.get_row_count_for_status(&mut tx, "failed").await?; - let failed_agg_rows = if self.hog_mode { - // Hog mode doesn't need to send metrics to Kafka (and can't aggregate by - // plugin anyway), so we can skip this. - vec![] - } else { - self.get_failed_agg_rows(&mut tx).await? - }; - let agg_row_count = failed_agg_rows.len() as u64; + let failed_row_count = self.get_row_count_for_status(&mut tx, "failed").await?; + let failed_agg_row_count = if self.hog_mode { + let failed_agg_rows = self.get_failed_agg_rows_for_hoghooks(&mut tx).await?; + let failed_agg_row_count = failed_agg_rows.len() as u64; + let failed_app_metrics: Vec = + failed_agg_rows.into_iter().map(Into::into).collect(); + send_metrics_to_kafka( + &self.kafka_producer, + &self.app_metrics2_topic, + failed_app_metrics, + ) + .await?; + failed_agg_row_count + } else { + let failed_agg_rows = self.get_failed_agg_rows(&mut tx).await?; + let failed_agg_row_count = failed_agg_rows.len() as u64; let failed_app_metrics: Vec = failed_agg_rows.into_iter().map(Into::into).collect(); - self.send_metrics_to_kafka(failed_app_metrics).await?; - (failed_row_count, agg_row_count) + send_metrics_to_kafka( + &self.kafka_producer, + &self.app_metrics_topic, + failed_app_metrics, + ) + .await?; + failed_agg_row_count }; let mut rows_deleted = 0; @@ -526,6 +673,7 @@ mod tests { use std::str::FromStr; const APP_METRICS_TOPIC: &str = "app_metrics"; + const APP_METRICS2_TOPIC: &str = "app_metrics2"; fn check_app_metric_vector_equality(v1: &[AppMetric], v2: &[AppMetric]) { // Ignores `error_uuid`s. @@ -559,6 +707,7 @@ mod tests { db, mock_producer, APP_METRICS_TOPIC.to_owned(), + APP_METRICS2_TOPIC.to_owned(), hog_mode, ) .expect("unable to create webhook cleaner"); @@ -732,6 +881,143 @@ mod tests { check_app_metric_vector_equality(&expected_app_metrics, &received_app_metrics); } + #[sqlx::test(migrations = "../migrations", fixtures("hoghook_cleanup"))] + async fn test_cleanup_impl_hoghooks(db: PgPool) { + let (mock_cluster, mock_producer) = create_mock_kafka().await; + mock_cluster + .create_topic(APP_METRICS2_TOPIC, 1, 1) + .expect("failed to create mock app_metrics2 topic"); + + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", mock_cluster.bootstrap_servers()) + .set("group.id", "mock") + .set("auto.offset.reset", "earliest") + .create() + .expect("failed to create mock consumer"); + consumer.subscribe(&[APP_METRICS2_TOPIC]).unwrap(); + + let hog_mode = true; + let hoghook_cleaner = WebhookCleaner::new_from_pool( + db, + mock_producer, + APP_METRICS_TOPIC.to_owned(), + APP_METRICS2_TOPIC.to_owned(), + hog_mode, + ) + .expect("unable to create hoghook cleaner"); + + let cleanup_stats = hoghook_cleaner + .cleanup_impl() + .await + .expect("webbook cleanup_impl failed"); + + // Rows that are not 'completed' or 'failed' should not be processed. + assert_eq!(cleanup_stats.rows_processed, 13); + + let mut received_app_metrics = Vec::new(); + for _ in 0..(cleanup_stats.completed_agg_row_count + cleanup_stats.failed_agg_row_count) { + let kafka_msg = consumer.recv().await.unwrap(); + let payload_str = String::from_utf8(kafka_msg.payload().unwrap().to_vec()).unwrap(); + let app_metric: AppMetric2 = serde_json::from_str(&payload_str).unwrap(); + received_app_metrics.push(app_metric); + } + + let expected_app_metrics = vec![ + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T20:00:00Z").unwrap(), + team_id: 1, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "2".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Success, + metric_name: "Fetch".to_owned(), + count: 3, + }, + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T20:00:00Z").unwrap(), + team_id: 1, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "3".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Success, + metric_name: "Fetch".to_owned(), + count: 1, + }, + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T20:00:00Z").unwrap(), + team_id: 2, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "4".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Success, + metric_name: "Fetch".to_owned(), + count: 1, + }, + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T21:00:00Z").unwrap(), + team_id: 1, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "2".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Success, + metric_name: "Fetch".to_owned(), + count: 1, + }, + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T20:00:00Z").unwrap(), + team_id: 1, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "2".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Failure, + metric_name: "Connection Error".to_owned(), + count: 1, + }, + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T20:00:00Z").unwrap(), + team_id: 1, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "2".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Failure, + metric_name: "Timeout Error".to_owned(), + count: 3, + }, + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T20:00:00Z").unwrap(), + team_id: 1, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "3".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Failure, + metric_name: "Timeout Error".to_owned(), + count: 1, + }, + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T20:00:00Z").unwrap(), + team_id: 2, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "4".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Failure, + metric_name: "Timeout Error".to_owned(), + count: 1, + }, + AppMetric2 { + timestamp: DateTime::::from_str("2023-12-19T21:00:00Z").unwrap(), + team_id: 1, + app_source: app_metrics2::Source::Hoghooks, + app_source_id: "2".to_owned(), + instance_id: None, + metric_kind: app_metrics2::Kind::Failure, + metric_name: "Timeout Error".to_owned(), + count: 1, + }, + ]; + + assert_eq!(&expected_app_metrics, &received_app_metrics); + } + #[sqlx::test(migrations = "../migrations")] async fn test_cleanup_impl_empty_queue(db: PgPool) { let (mock_cluster, mock_producer) = create_mock_kafka().await; @@ -757,6 +1043,7 @@ mod tests { db, mock_producer, APP_METRICS_TOPIC.to_owned(), + APP_METRICS2_TOPIC.to_owned(), hog_mode, ) .expect("unable to create webhook cleaner"); @@ -782,6 +1069,7 @@ mod tests { db.clone(), mock_producer, APP_METRICS_TOPIC.to_owned(), + APP_METRICS2_TOPIC.to_owned(), hog_mode, ) .expect("unable to create webhook cleaner");