From a4c51ecfe6b1bf88e64cb8860779ac302b1e99b0 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:36:41 -0700 Subject: [PATCH 1/4] Fix few Observable Counter and UpDownCounter bugs --- opentelemetry-sdk/CHANGELOG.md | 5 +- opentelemetry-sdk/src/metrics/internal/mod.rs | 14 ++++ opentelemetry-sdk/src/metrics/internal/sum.rs | 84 ++++++++++++------- opentelemetry-sdk/src/metrics/mod.rs | 56 +++++++++++-- 4 files changed, 119 insertions(+), 40 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 942086e333..45d46ce670 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -16,7 +16,10 @@ The custom exporters and processors can't directly access the `LogData::LogRecord::attributes`, as these are private to opentelemetry-sdk. Instead, they would now use LogRecord::attributes_iter() method to access them. - +- Fixed various Metric aggregation bug related to + ObservableCounter,UpDownCounter including + [1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517). + [#1990](https://github.com/open-telemetry/opentelemetry-rust/pull/1990) ## v0.24.1 diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 92bc3d947f..cf0edeb47c 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -15,6 +15,7 @@ pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms pub(crate) trait AtomicTracker: Sync + Send + 'static { + fn store(&self, value: T); fn add(&self, value: T); fn get_value(&self) -> T; fn get_and_reset_value(&self) -> T; @@ -90,6 +91,10 @@ impl Number for f64 { } impl AtomicTracker for AtomicU64 { + fn store(&self, value: u64) { + self.store(value, Ordering::Relaxed); + } + fn add(&self, value: u64) { self.fetch_add(value, Ordering::Relaxed); } @@ -112,6 +117,10 @@ impl AtomicallyUpdate for u64 { } impl AtomicTracker for AtomicI64 { + fn store(&self, value: i64) { + self.store(value, Ordering::Relaxed); + } + fn add(&self, value: i64) { self.fetch_add(value, Ordering::Relaxed); } @@ -146,6 +155,11 @@ impl F64AtomicTracker { } impl AtomicTracker for F64AtomicTracker { + fn store(&self, value: f64) { + let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); + *guard = value; + } + fn add(&self, value: f64) { let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); *guard += value; diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 912fbacd58..8a51f6deac 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -25,21 +25,23 @@ struct ValueMap> { has_no_value_attribute_value: AtomicBool, no_attribute_value: T::AtomicTracker, count: AtomicUsize, + assign_only: bool, // if true, only assign incoming value, instead of adding to existing value } impl> Default for ValueMap { fn default() -> Self { - ValueMap::new() + ValueMap::new(false) } } impl> ValueMap { - fn new() -> Self { + fn new(assign_only: bool) -> Self { ValueMap { values: RwLock::new(HashMap::new()), has_no_value_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), count: AtomicUsize::new(0), + assign_only: assign_only, } } } @@ -47,18 +49,30 @@ impl> ValueMap { impl> ValueMap { fn measure(&self, measurement: T, attrs: &[KeyValue]) { if attrs.is_empty() { - self.no_attribute_value.add(measurement); + if self.assign_only { + self.no_attribute_value.store(measurement); + } else { + self.no_attribute_value.add(measurement); + } self.has_no_value_attribute_value .store(true, Ordering::Release); } else if let Ok(values) = self.values.read() { // Try incoming order first if let Some(value_to_update) = values.get(attrs) { - value_to_update.add(measurement); + if self.assign_only { + value_to_update.store(measurement); + } else { + value_to_update.add(measurement); + } } else { // Then try sorted order. let sorted_attrs = AttributeSet::from(attrs).into_vec(); if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { - value_to_update.add(measurement); + if self.assign_only { + value_to_update.store(measurement); + } else { + value_to_update.add(measurement); + } } else { // Give up the lock, before acquiring write lock. drop(values); @@ -67,12 +81,24 @@ impl> ValueMap { // write lock, in case another thread has added the // value. if let Some(value_to_update) = values.get(attrs) { - value_to_update.add(measurement); + if self.assign_only { + value_to_update.store(measurement); + } else { + value_to_update.add(measurement); + } } else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { - value_to_update.add(measurement); + if self.assign_only { + value_to_update.store(measurement); + } else { + value_to_update.add(measurement); + } } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { let new_value = T::new_atomic_tracker(); - new_value.add(measurement); + if self.assign_only { + new_value.store(measurement); + } else { + new_value.add(measurement); + } let new_value = Arc::new(new_value); // Insert original order @@ -85,10 +111,18 @@ impl> ValueMap { } else if let Some(overflow_value) = values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - overflow_value.add(measurement); + if self.assign_only { + overflow_value.store(measurement); + } else { + overflow_value.add(measurement); + } } else { let new_value = T::new_atomic_tracker(); - new_value.add(measurement); + if self.assign_only { + new_value.store(measurement); + } else { + new_value.add(measurement); + } values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value)); global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); } @@ -114,7 +148,7 @@ impl> Sum { /// were made in. pub(crate) fn new(monotonic: bool) -> Self { Sum { - value_map: ValueMap::new(), + value_map: ValueMap::new(false), monotonic, start: Mutex::new(SystemTime::now()), } @@ -282,7 +316,7 @@ pub(crate) struct PrecomputedSum> { impl> PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(), + value_map: ValueMap::new(true), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -334,11 +368,16 @@ impl> PrecomputedSum { .has_no_value_attribute_value .swap(false, Ordering::AcqRel) { + let default = T::default(); + let delta = self.value_map.no_attribute_value.get_value() + - *reported.get(&vec![]).unwrap_or(&default); + new_reported.insert(vec![], self.value_map.no_attribute_value.get_value()); + s_data.data_points.push(DataPoint { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_value.get_and_reset_value(), + value: delta, exemplars: vec![], }); } @@ -351,9 +390,7 @@ impl> PrecomputedSum { let default = T::default(); for (attrs, value) in values.drain() { let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default); - if delta != default { - new_reported.insert(attrs.clone(), value.get_value()); - } + new_reported.insert(attrs.clone(), value.get_value()); s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), @@ -408,11 +445,6 @@ impl> PrecomputedSum { .data_points .reserve_exact(n - s_data.data_points.capacity()); } - let mut new_reported = HashMap::with_capacity(n); - let mut reported = match self.reported.lock() { - Ok(r) => r, - Err(_) => return (0, None), - }; if self .value_map @@ -432,24 +464,16 @@ impl> PrecomputedSum { Ok(v) => v, Err(_) => return (0, None), }; - let default = T::default(); for (attrs, value) in values.iter() { - let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default); - if delta != default { - new_reported.insert(attrs.clone(), value.get_value()); - } s_data.data_points.push(DataPoint { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: delta, + value: value.get_value(), exemplars: vec![], }); } - *reported = new_reported; - drop(reported); // drop before values guard is dropped - ( s_data.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>), diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 46904095e8..86c439150a 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -271,29 +271,56 @@ mod tests { async fn observable_counter_aggregation_cumulative_non_zero_increment() { // Run this test with stdout enabled to see output. // cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture - observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4); + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_cumulative_non_zero_increment_no_attrs --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn observable_counter_aggregation_delta_non_zero_increment() { // Run this test with stdout enabled to see output. // cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture - observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4); + observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_delta_non_zero_increment_no_attrs --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn observable_counter_aggregation_cumulative_zero_increment() { // Run this test with stdout enabled to see output. // cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture - observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4); + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_cumulative_zero_increment_no_attrs --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - #[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"] async fn observable_counter_aggregation_delta_zero_increment() { // Run this test with stdout enabled to see output. // cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture - observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4); + observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_aggregation_delta_zero_increment_no_attrs() { + // Run this test with stdout enabled to see output. + // cargo test observable_counter_aggregation_delta_zero_increment_no_attrs --features=testing -- --nocapture + observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true); } fn observable_counter_aggregation_helper( @@ -301,9 +328,15 @@ mod tests { start: u64, increment: u64, length: u64, + is_empty_attributes: bool, ) { // Arrange let mut test_context = TestContext::new(temporality); + let attributes = if is_empty_attributes { + vec![] + } else { + vec![KeyValue::new("key1", "value1")] + }; // The Observable counter reports values[0], values[1],....values[n] on each flush. let values: Vec = (0..length).map(|i| start + i * increment).collect(); println!("Testing with observable values: {:?}", values); @@ -317,7 +350,7 @@ mod tests { .with_callback(move |observer| { let mut index = i.lock().unwrap(); if *index < values.len() { - observer.observe(values[*index], &[KeyValue::new("key1", "value1")]); + observer.observe(values[*index], &attributes); *index += 1; } }) @@ -338,9 +371,14 @@ mod tests { assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); } - // find and validate key1=value1 datapoint - let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); + // find and validate datapoint + let data_point = if is_empty_attributes { + &sum.data_points[0] + } else { + find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected") + }; + if let Temporality::Cumulative = temporality { // Cumulative counter should have the value as is. assert_eq!(data_point.value, *v); From 223eea7681a54a15be3affee9e9a8a53fa0f5c13 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:43:15 -0700 Subject: [PATCH 2/4] clipp --- opentelemetry-sdk/src/metrics/internal/sum.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 8a51f6deac..30b708ae22 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -41,7 +41,7 @@ impl> ValueMap { has_no_value_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), count: AtomicUsize::new(0), - assign_only: assign_only, + assign_only, } } } From cc3d9280371e3692960582b8543f883471f962d0 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:53:19 -0700 Subject: [PATCH 3/4] ignore spatial test --- opentelemetry-sdk/src/metrics/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 86c439150a..6f3f7c951c 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -667,8 +667,9 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[ignore = "Spatial aggregation is not yet implemented."] async fn spatial_aggregation_when_view_drops_attributes_observable_counter() { - // cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing + // metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter // Arrange let exporter = InMemoryMetricsExporter::default(); From b1104fa81aa86f3ef27f669e330e1dbf1306b951 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 6 Aug 2024 23:54:09 -0700 Subject: [PATCH 4/4] comment right --- opentelemetry-sdk/src/metrics/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 6f3f7c951c..3225e660ac 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -669,7 +669,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[ignore = "Spatial aggregation is not yet implemented."] async fn spatial_aggregation_when_view_drops_attributes_observable_counter() { - // metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter + // cargo test metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing // Arrange let exporter = InMemoryMetricsExporter::default();