Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use generics to dispatch updates in ValueMap #2004

5 changes: 4 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
The custom exporters and processors can't directly access the `LogData::LogRecord::attributes`, as
these are private to opentelemetry-sdk. Instead, they would now use LogRecord::attributes_iter()
method to access them.

- Fixed various Metric aggregation bug related to
ObservableCounter,UpDownCounter including
[#1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517).
[#1992](https://github.com/open-telemetry/opentelemetry-rust/pull/1992)
cijothomas marked this conversation as resolved.
Show resolved Hide resolved

## v0.24.1

Expand Down
14 changes: 14 additions & 0 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
fn store(&self, value: T);
fn add(&self, value: T);
fn get_value(&self) -> T;
fn get_and_reset_value(&self) -> T;
Expand Down Expand Up @@ -90,6 +91,10 @@
}

impl AtomicTracker<u64> for AtomicU64 {
fn store(&self, value: u64) {
self.store(value, Ordering::Relaxed);
}

fn add(&self, value: u64) {
self.fetch_add(value, Ordering::Relaxed);
}
Expand All @@ -112,6 +117,10 @@
}

impl AtomicTracker<i64> for AtomicI64 {
fn store(&self, value: i64) {
self.store(value, Ordering::Relaxed);
}

Check warning on line 122 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L120-L122

Added lines #L120 - L122 were not covered by tests

fn add(&self, value: i64) {
self.fetch_add(value, Ordering::Relaxed);
}
Expand Down Expand Up @@ -146,6 +155,11 @@
}

impl AtomicTracker<f64> for F64AtomicTracker {
fn store(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard = value;
}

Check warning on line 161 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L158-L161

Added lines #L158 - L161 were not covered by tests

fn add(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard += value;
Expand Down
200 changes: 111 additions & 89 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::vec;
Expand All @@ -19,89 +20,122 @@
pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// Abstracts the update operation for a measurement.
trait Operation {
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
fn update_tracker<T: 'static>(tracker: &dyn AtomicTracker<T>, value: T);
stormshield-fabs marked this conversation as resolved.
Show resolved Hide resolved
}

struct Increment;

impl Operation for Increment {
fn update_tracker<T: 'static>(tracker: &dyn AtomicTracker<T>, value: T) {
tracker.add(value);
}
}

struct Assign;

impl Operation for Assign {
fn update_tracker<T: 'static>(tracker: &dyn AtomicTracker<T>, value: T) {
tracker.store(value);
}
}

/// The storage for sums.
struct ValueMap<T: Number<T>> {
values: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
///
/// This structure is parametrized by an `Operation` that indicates how
/// updates to the underlying value trackers should be performed.
struct ValueMap<T: Number<T>, O> {
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_value: T::AtomicTracker,
phantom: PhantomData<O>,
}

impl<T: Number<T>> Default for ValueMap<T> {
impl<T: Number<T>, O> Default for ValueMap<T, O> {
fn default() -> Self {
ValueMap::new()
}
}

impl<T: Number<T>> ValueMap<T> {
impl<T: Number<T>, O> ValueMap<T, O> {
fn new() -> Self {
ValueMap {
values: RwLock::new(HashMap::new()),
has_no_value_attribute_value: AtomicBool::new(false),
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
phantom: PhantomData,
}
}
}

impl<T: Number<T>> ValueMap<T> {
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
if attrs.is_empty() {
self.no_attribute_value.add(measurement);
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else if let Ok(values) = self.values.read() {
// Try incoming order first
if let Some(value_to_update) = values.get(attrs) {
value_to_update.add(measurement);
} else {
// Then try sorted order.
let sorted_attrs = AttributeSet::from(attrs).into_vec();
if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
value_to_update.add(measurement);
} else {
// Give up the lock, before acquiring write lock.
drop(values);
if let Ok(mut values) = self.values.write() {
// Recheck both incoming and sorted after acquiring
// write lock, in case another thread has added the
// value.
if let Some(value_to_update) = values.get(attrs) {
value_to_update.add(measurement);
} else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
value_to_update.add(measurement);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
let new_value = Arc::new(new_value);

// Insert original order
values.insert(attrs.to_vec(), new_value.clone());

// Insert sorted order
values.insert(sorted_attrs, new_value);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) =
values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice())
{
overflow_value.add(measurement);
} else {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}
}
impl<T: Number<T>, O: Operation> ValueMap<T, O> {
fn measure(&self, measurement: T, attributes: &[KeyValue]) {
if attributes.is_empty() {
O::update_tracker(&self.no_attribute_value, measurement);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
return;

Check warning on line 87 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L87

Added line #L87 was not covered by tests
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);
return;
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);
return;
}

// Give up the read lock before acquiring the write lock.
drop(trackers);

let Ok(mut trackers) = self.trackers.write() else {
return;

Check warning on line 107 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L107

Added line #L107 was not covered by tests
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
O::update_tracker(&**tracker, measurement);

Check warning on line 113 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L113

Added line #L113 was not covered by tests
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
O::update_tracker(&**tracker, measurement);

Check warning on line 115 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L115

Added line #L115 was not covered by tests
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(T::new_atomic_tracker());
O::update_tracker(&*new_tracker, measurement);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
O::update_tracker(&**overflow_value, measurement);
} else {
let new_tracker = T::new_atomic_tracker();
O::update_tracker(&new_tracker, measurement);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}

/// Summarizes a set of measurements made as their arithmetic sum.
pub(crate) struct Sum<T: Number<T>> {
value_map: ValueMap<T>,
value_map: ValueMap<T, Increment>,
monotonic: bool,
start: Mutex<SystemTime>,
}
Expand Down Expand Up @@ -157,7 +191,7 @@
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
if self
.value_map
.has_no_value_attribute_value
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
s_data.data_points.push(DataPoint {
Expand All @@ -169,7 +203,7 @@
});
}

let mut values = match self.value_map.values.write() {
let mut values = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};
Expand Down Expand Up @@ -233,7 +267,7 @@

if self
.value_map
.has_no_value_attribute_value
.has_no_attribute_value
.load(Ordering::Acquire)
{
s_data.data_points.push(DataPoint {
Expand All @@ -245,7 +279,7 @@
});
}

let values = match self.value_map.values.write() {
let values = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};
Expand Down Expand Up @@ -273,7 +307,7 @@

/// Summarizes a set of pre-computed sums as their arithmetic sum.
pub(crate) struct PrecomputedSum<T: Number<T>> {
value_map: ValueMap<T>,
value_map: ValueMap<T, Assign>,
monotonic: bool,
start: Mutex<SystemTime>,
reported: Mutex<HashMap<Vec<KeyValue>, T>>,
Expand Down Expand Up @@ -331,29 +365,30 @@

if self
.value_map
.has_no_value_attribute_value
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
let value = self.value_map.no_attribute_value.get_value();
let delta = value - *reported.get(&vec![]).unwrap_or(&T::default());
new_reported.insert(vec![], value);

s_data.data_points.push(DataPoint {
attributes: vec![],
start_time: Some(prev_start),
time: Some(t),
value: self.value_map.no_attribute_value.get_and_reset_value(),
value: delta,
exemplars: vec![],
});
}

let mut values = match self.value_map.values.write() {
let mut values = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let default = T::default();
for (attrs, value) in values.drain() {
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value.get_value());
}
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&T::default());
new_reported.insert(attrs.clone(), value.get_value());
stormshield-fabs marked this conversation as resolved.
Show resolved Hide resolved
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
Expand Down Expand Up @@ -408,15 +443,10 @@
.data_points
.reserve_exact(n - s_data.data_points.capacity());
}
let mut new_reported = HashMap::with_capacity(n);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
};

if self
.value_map
.has_no_value_attribute_value
.has_no_attribute_value
.load(Ordering::Acquire)
{
s_data.data_points.push(DataPoint {
Expand All @@ -428,28 +458,20 @@
});
}

let values = match self.value_map.values.write() {
let values = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};
let default = T::default();
for (attrs, value) in values.iter() {
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value.get_value());
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
value: value.get_value(),
exemplars: vec![],
});
}

*reported = new_reported;
drop(reported); // drop before values guard is dropped

(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
Expand Down
Loading
Loading