From c08f4d54bb30775e4d3249058984cb67c3928e59 Mon Sep 17 00:00:00 2001 From: Troy Benson Date: Sat, 13 Jul 2024 01:30:00 +0000 Subject: [PATCH] feat: add unit error and telementry --- foundations/src/batcher/dataloader.rs | 35 +++++++++++++++++++++++---- foundations/src/batcher/mod.rs | 14 +++++++++-- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/foundations/src/batcher/dataloader.rs b/foundations/src/batcher/dataloader.rs index 845605cc..edc59c49 100644 --- a/foundations/src/batcher/dataloader.rs +++ b/foundations/src/batcher/dataloader.rs @@ -7,6 +7,23 @@ use super::{BatchOperation, Batcher, BatcherConfig, BatcherDataloader, BatcherEr #[allow(type_alias_bounds)] pub type LoaderOutput, S: BuildHasher = RandomState> = Result, L::Error>; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)] +pub struct UnitError; + +impl std::error::Error for UnitError {} + +impl std::fmt::Display for UnitError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "unknown") + } +} + +impl From<()> for UnitError { + fn from(_: ()) -> Self { + Self + } +} + pub trait Loader { type Key: Clone + Eq + std::hash::Hash + Send + Sync; type Value: Clone + Send + Sync; @@ -14,6 +31,7 @@ pub trait Loader { fn config(&self) -> BatcherConfig { BatcherConfig { + name: std::any::type_name::().to_string(), concurrency: 10, max_batch_size: 1000, sleep_duration: std::time::Duration::from_millis(5), @@ -23,13 +41,15 @@ pub trait Loader { fn load(&self, keys: Vec) -> impl std::future::Future> + Send; } -pub struct DataLoader + Send + Sync, S: BuildHasher + Default + Send + Sync = RandomState>( - Batcher>, -); +pub struct DataLoader + Send + Sync, S: BuildHasher + Default + Send + Sync = RandomState> { + batcher: Batcher>, +} impl + Send + Sync + 'static, S: BuildHasher + Default + Send + Sync + 'static> DataLoader { pub fn new(loader: L) -> Self { - Self(Batcher::new(Wrapper(loader, PhantomData))) + Self { + batcher: Batcher::new(Wrapper(loader, PhantomData)), + } } pub async fn load(&self, key: L::Key) -> Result, BatcherError> { @@ -42,7 +62,7 @@ impl + Send + Sync + 'static, S: BuildHasher + Default + Send + Syn &self, keys: impl IntoIterator, ) -> Result, BatcherError> { - self.0.execute_many(keys).await + self.batcher.execute_many(keys).await } } @@ -119,6 +139,7 @@ mod tests { results }), config: BatcherConfig { + name: "test".to_string(), concurrency: 10, max_batch_size: 1000, sleep_duration: std::time::Duration::from_millis(5), @@ -158,6 +179,7 @@ mod tests { results }), config: BatcherConfig { + name: "test".to_string(), concurrency: 10, max_batch_size: 1000, sleep_duration: std::time::Duration::from_millis(5), @@ -192,6 +214,7 @@ mod tests { results }), config: BatcherConfig { + name: "test".to_string(), concurrency: 10, max_batch_size: 3000, sleep_duration: std::time::Duration::from_millis(5), @@ -226,6 +249,7 @@ mod tests { results }), config: BatcherConfig { + name: "test".to_string(), concurrency: 10, max_batch_size: 1000, sleep_duration: std::time::Duration::from_millis(100), @@ -261,6 +285,7 @@ mod tests { } }), config: BatcherConfig { + name: "test".to_string(), concurrency: 10, max_batch_size: 1000, sleep_duration: std::time::Duration::from_millis(5), diff --git a/foundations/src/batcher/mod.rs b/foundations/src/batcher/mod.rs index 2d562c46..517eb266 100644 --- a/foundations/src/batcher/mod.rs +++ b/foundations/src/batcher/mod.rs @@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize}; use std::sync::Arc; use tokio::sync::OnceCell; +use tracing::Instrument; pub mod dataloader; @@ -228,6 +229,7 @@ struct BatcherInner { batch_id: AtomicU64, max_batch_size: AtomicUsize, operation: T, + name: String, active_batch: tokio::sync::RwLock>>, } @@ -283,10 +285,16 @@ impl From for BatcherError { } impl Batch { + #[tracing::instrument(skip_all, fields(name = %inner.name))] async fn run(self, inner: Arc>) { self.results .get_or_init(|| async move { - let _ticket = inner.semaphore.acquire().await.map_err(|_| BatcherError::AcquireSemaphore)?; + let _ticket = inner + .semaphore + .acquire() + .instrument(tracing::debug_span!("Semaphore")) + .await + .map_err(|_| BatcherError::AcquireSemaphore)?; Ok(inner.operation.process(self.ops).await.map_err(BatcherError::Batch)?) }) .await; @@ -295,6 +303,7 @@ impl Batch { #[derive(Clone)] pub struct BatcherConfig { + pub name: String, pub concurrency: usize, pub max_batch_size: usize, pub sleep_duration: std::time::Duration, @@ -371,6 +380,7 @@ impl Batcher { sleep_duration: AtomicU64::new(config.sleep_duration.as_nanos() as u64), max_batch_size: AtomicUsize::new(config.max_batch_size), operation, + name: config.name, }); Self { @@ -405,7 +415,7 @@ impl Batcher { T::Mode::output_item_to_result(iter.into_iter().next().ok_or(BatcherError::MissingResult)?) } - #[tracing::instrument(skip_all)] + #[tracing::instrument(skip_all, fields(name = %self.inner.name))] pub async fn execute_many( &self, documents: impl IntoIterator,