Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix few Observable Counter and UpDownCounter bugs #1992

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
The custom exporters and processors can't directly access the `LogData::LogRecord::attributes`, as
these are private to opentelemetry-sdk. Instead, they would now use LogRecord::attributes_iter()
method to access them.

- Fixed various Metric aggregation bug related to
ObservableCounter,UpDownCounter including
[1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517).
[#1990](https://github.com/open-telemetry/opentelemetry-rust/pull/1990)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[#1990](https://github.com/open-telemetry/opentelemetry-rust/pull/1990)
[#1992](https://github.com/open-telemetry/opentelemetry-rust/pull/1992)


## v0.24.1

Expand Down
14 changes: 14 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
fn store(&self, value: T);
fn add(&self, value: T);
fn get_value(&self) -> T;
fn get_and_reset_value(&self) -> T;
Expand Down Expand Up @@ -90,6 +91,10 @@ impl Number<f64> for f64 {
}

impl AtomicTracker<u64> for AtomicU64 {
fn store(&self, value: u64) {
self.store(value, Ordering::Relaxed);
}

fn add(&self, value: u64) {
self.fetch_add(value, Ordering::Relaxed);
}
Expand All @@ -112,6 +117,10 @@ impl AtomicallyUpdate<u64> for u64 {
}

impl AtomicTracker<i64> for AtomicI64 {
fn store(&self, value: i64) {
self.store(value, Ordering::Relaxed);
}

fn add(&self, value: i64) {
self.fetch_add(value, Ordering::Relaxed);
}
Expand Down Expand Up @@ -146,6 +155,11 @@ impl F64AtomicTracker {
}

impl AtomicTracker<f64> for F64AtomicTracker {
fn store(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard = value;
}

fn add(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard += value;
Expand Down
84 changes: 54 additions & 30 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,54 @@ struct ValueMap<T: Number<T>> {
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
count: AtomicUsize,
assign_only: bool, // if true, only assign incoming value, instead of adding to existing value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename this to something more intuitive like is_sum_precomputed? I think that would better explain the reasoning behind the logic when reading the code:

if self.is_sum_precomputed {
    value_to_update.store(measurement);
} else {
    value_to_update.add(measurement);
}

}

impl<T: Number<T>> Default for ValueMap<T> {
fn default() -> Self {
ValueMap::new()
ValueMap::new(false)
}
}

impl<T: Number<T>> ValueMap<T> {
fn new() -> Self {
fn new(assign_only: bool) -> Self {
ValueMap {
values: RwLock::new(HashMap::new()),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
assign_only,
}
}
}

impl<T: Number<T>> ValueMap<T> {
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
if attrs.is_empty() {
self.no_attribute_value.add(measurement);
if self.assign_only {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stormshield-fabs Could you help refactor this to use generics, so we dont have this duplication everywhere and get better perf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extract this repeated snippet to a method for now? It would also make it easier to review the future PR that would refactor it using generics.

self.no_attribute_value.store(measurement);
} else {
self.no_attribute_value.add(measurement);
}
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else if let Ok(values) = self.values.read() {
// Try incoming order first
if let Some(value_to_update) = values.get(attrs) {
value_to_update.add(measurement);
if self.assign_only {
value_to_update.store(measurement);
} else {
value_to_update.add(measurement);
}
} else {
// Then try sorted order.
let sorted_attrs = AttributeSet::from(attrs).into_vec();
if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
value_to_update.add(measurement);
if self.assign_only {
value_to_update.store(measurement);
} else {
value_to_update.add(measurement);
}
} else {
// Give up the lock, before acquiring write lock.
drop(values);
Expand All @@ -67,12 +81,24 @@ impl<T: Number<T>> ValueMap<T> {
// write lock, in case another thread has added the
// value.
if let Some(value_to_update) = values.get(attrs) {
value_to_update.add(measurement);
if self.assign_only {
value_to_update.store(measurement);
} else {
value_to_update.add(measurement);
}
} else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
value_to_update.add(measurement);
if self.assign_only {
value_to_update.store(measurement);
} else {
value_to_update.add(measurement);
}
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
if self.assign_only {
new_value.store(measurement);
} else {
new_value.add(measurement);
}
let new_value = Arc::new(new_value);

// Insert original order
Expand All @@ -85,10 +111,18 @@ impl<T: Number<T>> ValueMap<T> {
} else if let Some(overflow_value) =
values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice())
{
overflow_value.add(measurement);
if self.assign_only {
overflow_value.store(measurement);
} else {
overflow_value.add(measurement);
}
} else {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
if self.assign_only {
new_value.store(measurement);
} else {
new_value.add(measurement);
}
values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
Expand All @@ -114,7 +148,7 @@ impl<T: Number<T>> Sum<T> {
/// were made in.
pub(crate) fn new(monotonic: bool) -> Self {
Sum {
value_map: ValueMap::new(),
value_map: ValueMap::new(false),
monotonic,
start: Mutex::new(SystemTime::now()),
}
Expand Down Expand Up @@ -282,7 +316,7 @@ pub(crate) struct PrecomputedSum<T: Number<T>> {
impl<T: Number<T>> PrecomputedSum<T> {
pub(crate) fn new(monotonic: bool) -> Self {
PrecomputedSum {
value_map: ValueMap::new(),
value_map: ValueMap::new(true),
monotonic,
start: Mutex::new(SystemTime::now()),
reported: Mutex::new(Default::default()),
Expand Down Expand Up @@ -334,11 +368,16 @@ impl<T: Number<T>> PrecomputedSum<T> {
.has_no_value_attribute_value
.swap(false, Ordering::AcqRel)
{
let default = T::default();
let delta = self.value_map.no_attribute_value.get_value()
- *reported.get(&vec![]).unwrap_or(&default);
new_reported.insert(vec![], self.value_map.no_attribute_value.get_value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be storing the computed delta value here and not do another atomic read. Otherwise, we risk storing an incorrect value from another update thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The self.value_map.no_attribute_value fetched once in line 372 should be used as new_reported value instead of fetching again.


s_data.data_points.push(DataPoint {
attributes: vec![],
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_value.get_and_reset_value(),
value: delta,
exemplars: vec![],
});
}
Expand All @@ -351,9 +390,7 @@ impl<T: Number<T>> PrecomputedSum<T> {
let default = T::default();
for (attrs, value) in values.drain() {
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value.get_value());
}
new_reported.insert(attrs.clone(), value.get_value());
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
Expand Down Expand Up @@ -408,11 +445,6 @@ impl<T: Number<T>> PrecomputedSum<T> {
.data_points
.reserve_exact(n - s_data.data_points.capacity());
}
let mut new_reported = HashMap::with_capacity(n);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
};

if self
.value_map
Expand All @@ -432,24 +464,16 @@ impl<T: Number<T>> PrecomputedSum<T> {
Ok(v) => v,
Err(_) => return (0, None),
};
let default = T::default();
for (attrs, value) in values.iter() {
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value.get_value());
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
value: value.get_value(),
exemplars: vec![],
});
}

*reported = new_reported;
drop(reported); // drop before values guard is dropped

(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
Expand Down
59 changes: 49 additions & 10 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,39 +271,72 @@ mod tests {
async fn observable_counter_aggregation_cumulative_non_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4);
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_cumulative_non_zero_increment_no_attrs() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_non_zero_increment_no_attrs --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4, true);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_delta_non_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4);
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_delta_non_zero_increment_no_attrs() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_non_zero_increment_no_attrs --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4, true);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_cumulative_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4);
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_cumulative_zero_increment_no_attrs() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_cumulative_zero_increment_no_attrs --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4, true);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"]
async fn observable_counter_aggregation_delta_zero_increment() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4);
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, false);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_aggregation_delta_zero_increment_no_attrs() {
// Run this test with stdout enabled to see output.
// cargo test observable_counter_aggregation_delta_zero_increment_no_attrs --features=testing -- --nocapture
observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4, true);
}

fn observable_counter_aggregation_helper(
temporality: Temporality,
start: u64,
increment: u64,
length: u64,
is_empty_attributes: bool,
) {
// Arrange
let mut test_context = TestContext::new(temporality);
let attributes = if is_empty_attributes {
vec![]
} else {
vec![KeyValue::new("key1", "value1")]
};
// The Observable counter reports values[0], values[1],....values[n] on each flush.
let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
println!("Testing with observable values: {:?}", values);
Expand All @@ -317,7 +350,7 @@ mod tests {
.with_callback(move |observer| {
let mut index = i.lock().unwrap();
if *index < values.len() {
observer.observe(values[*index], &[KeyValue::new("key1", "value1")]);
observer.observe(values[*index], &attributes);
*index += 1;
}
})
Expand All @@ -338,9 +371,14 @@ mod tests {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}

// find and validate key1=value1 datapoint
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
// find and validate datapoint
let data_point = if is_empty_attributes {
&sum.data_points[0]
} else {
find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected")
};

if let Temporality::Cumulative = temporality {
// Cumulative counter should have the value as is.
assert_eq!(data_point.value, *v);
Expand Down Expand Up @@ -629,8 +667,9 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Spatial aggregation is not yet implemented."]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this got accidentally fixed in some other PR, but this PR breaks it. spatial aggregation is complex and needs a separate discussion altogether!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no issue to track this, probably good to have one.

async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing
// metrics::tests::spatial_aggregation_when_view_drops_attributes_observable_counter

// Arrange
let exporter = InMemoryMetricsExporter::default();
Expand Down
Loading