diff --git a/skystreamer-prometheus-exporter/src/main.rs b/skystreamer-prometheus-exporter/src/main.rs index b060d97..abb3493 100644 --- a/skystreamer-prometheus-exporter/src/main.rs +++ b/skystreamer-prometheus-exporter/src/main.rs @@ -3,7 +3,10 @@ mod util; use color_eyre::Result; use futures::StreamExt; use posts::PostsRegistry; -use prometheus_exporter::{self, prometheus::register_int_counter}; +use prometheus_exporter::{ + self, + prometheus::{register_int_counter, register_int_counter_vec}, +}; use skystreamer::{stream::EventStream, types::commit::Record, RepoSubscription}; use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; @@ -52,6 +55,12 @@ async fn main() -> Result<()> { "Total number of events from the AT Firehose" )?; + let type_counter = register_int_counter_vec!( + "skystreamer_atproto_event_typed", + "Total number of events from the AT Firehose", + &["type"] + )?; + // const MAX_SAMPLE_SIZE: usize = 10000; let subscription = RepoSubscription::new("bsky.network") @@ -82,16 +91,41 @@ async fn main() -> Result<()> { primary_counter.inc(); let posts_registry = posts.clone(); + let type_counter = type_counter.clone(); tokio::spawn(async move { match record { Record::Post(post) => { - let post = post.clone(); + type_counter.with_label_values(&["post"]).inc(); tokio::spawn(async move { let mut posts_registry = posts_registry.lock().unwrap(); posts_registry.handle_post(&post)?; Ok::<(), color_eyre::eyre::Report>(()) }); } + Record::Block(_) => { + type_counter.with_label_values(&["block"]).inc(); + // todo + } + Record::Like(_) => { + type_counter.with_label_values(&["like"]).inc(); + // todo + } + Record::Follow(_) => { + type_counter.with_label_values(&["follow"]).inc(); + // todo + } + Record::Repost(_) => { + type_counter.with_label_values(&["repost"]).inc(); + // todo + } + Record::ListItem(_) => { + type_counter.with_label_values(&["list_item"]).inc(); + // todo + } + Record::Profile(_) => { + type_counter.with_label_values(&["profile"]).inc(); + // todo + } _ => { // todo }