From fc686ed56dc0814bc494a56234ad8be519c7fc4a Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Thu, 9 Jan 2025 23:41:39 +0800 Subject: [PATCH] feat(layer/otelmetrics): add OtelMetricsLayer (#5524) --- core/Cargo.lock | 27 +-- core/Cargo.toml | 16 +- core/README.md | 40 ++-- core/src/layers/mod.rs | 5 + core/src/layers/otelmetrics.rs | 341 +++++++++++++++++++++++++++++++++ core/src/layers/tracing.rs | 11 +- 6 files changed, 396 insertions(+), 44 deletions(-) create mode 100644 core/src/layers/otelmetrics.rs diff --git a/core/Cargo.lock b/core/Cargo.lock index c2200da5c530..c409e9d6d3c2 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -5367,23 +5367,23 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.26.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" dependencies = [ "futures-core", "futures-sink", "js-sys", - "once_cell", "pin-project-lite", "thiserror 1.0.69", + "tracing", ] [[package]] name = "opentelemetry-otlp" -version = "0.26.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" dependencies = [ "async-trait", "futures-core", @@ -5395,13 +5395,14 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tonic 0.12.3", + "tracing", ] [[package]] name = "opentelemetry-proto" -version = "0.26.1" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -5411,21 +5412,23 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.26.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" dependencies = [ "async-trait", "futures-channel", "futures-executor", "futures-util", "glob", - "once_cell", "opentelemetry", "percent-encoding", "rand 0.8.5", "serde_json", "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", ] [[package]] @@ -8708,9 +8711,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.27.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" dependencies = [ "js-sys", "once_cell", diff --git a/core/Cargo.toml b/core/Cargo.toml index c05a8b5a693f..8b0461a73f13 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -91,8 +91,10 @@ layers-prometheus-client = ["dep:prometheus-client"] layers-fastrace = ["dep:fastrace"] # Enable layers tracing support. layers-tracing = ["dep:tracing"] +# Enable layers otelmetrics support. +layers-otel-metrics = ["opentelemetry/metrics"] # Enable layers oteltrace support. -layers-otel-trace = ["dep:opentelemetry"] +layers-otel-trace = ["opentelemetry/trace"] # Enable layers throttle support. layers-throttle = ["dep:governor"] # Enable layers await-tree support. @@ -374,7 +376,7 @@ mime_guess = { version = "2.0.5", optional = true } # for layers-fastrace fastrace = { version = "0.7.1", optional = true } # for layers-opentelemetry -opentelemetry = { version = "0.26", optional = true } +opentelemetry = { version = "0.27", optional = true } # for layers-prometheus prometheus = { version = "0.13", features = ["process"], optional = true } # for layers-prometheus-client @@ -395,17 +397,17 @@ dotenvy = "0.15" fastrace = { version = "0.7", features = ["enable"] } fastrace-jaeger = "0.7" libtest-mimic = "0.8" -opentelemetry = { version = "0.26", default-features = false, features = [ - "trace", +opentelemetry = { version = "0.27", default-features = false, features = [ + "trace", ] } -opentelemetry-otlp = "0.26" -opentelemetry_sdk = "0.26" +opentelemetry-otlp = "0.27" +opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } pretty_assertions = "1" rand = "0.8" sha2 = "0.10" size = "0.4" tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] } -tracing-opentelemetry = "0.27.0" +tracing-opentelemetry = "0.28.0" tracing-subscriber = { version = "0.3", features = [ "env-filter", "tracing-log", diff --git a/core/README.md b/core/README.md index c76810dfbe97..788b6eb6212a 100644 --- a/core/README.md +++ b/core/README.md @@ -105,25 +105,26 @@ OpenDAL supports the following storage [services](https://docs.rs/opendal/latest OpenDAL supports the following storage [layers](https://docs.rs/opendal/latest/opendal/layers/index.html) to extend the behavior: -| Name | Depends | Description | -|---------------------------|------------------------|---------------------------------------------------------------------------------------| -| [`AsyncBacktraceLayer`] | [async-backtrace] | Add Efficient, logical 'stack' traces of async functions for the underlying services. | -| [`AwaitTreeLayer`] | [await-tree] | Add a Instrument await-tree for actor-based applications to the underlying services. | -| [`BlockingLayer`] | [tokio] | Add blocking API support for non-blocking services. | -| [`ChaosLayer`] | [rand] | Inject chaos into underlying services for robustness test. | -| [`ConcurrentLimitLayer`] | [tokio] | Add concurrent request limit. | -| [`DtraceLayer`] | [probe] | Support User Statically-Defined Tracing(aka USDT) on Linux | -| [`LoggingLayer`] | [log] | Add log for every operations. | -| [`MetricsLayer`] | [metrics] | Add metrics for every operations. | -| [`MimeGuessLayer`] | [mime_guess] | Add `Content-Type` automatically based on the file extension in the operation path. | -| [`FastraceLayer`] | [fastrace] | Add fastrace for every operations. | -| [`OtelTraceLayer`] | [opentelemetry::trace] | Add opentelemetry::trace for every operations. | -| [`PrometheusClientLayer`] | [prometheus_client] | Add prometheus metrics for every operations. | -| [`PrometheusLayer`] | [prometheus] | Add prometheus metrics for every operations. | -| [`RetryLayer`] | [backon] | Add retry for temporary failed operations. | -| [`ThrottleLayer`] | [governor] | Add a bandwidth rate limiter to the underlying services. | -| [`TimeoutLayer`] | [tokio] | Add timeout for every operations to avoid slow or unexpected hang operations. | -| [`TracingLayer`] | [tracing] | Add tracing for every operations. | +| Name | Depends | Description | +|---------------------------|--------------------------|---------------------------------------------------------------------------------------| +| [`AsyncBacktraceLayer`] | [async-backtrace] | Add Efficient, logical 'stack' traces of async functions for the underlying services. | +| [`AwaitTreeLayer`] | [await-tree] | Add a Instrument await-tree for actor-based applications to the underlying services. | +| [`BlockingLayer`] | [tokio] | Add blocking API support for non-blocking services. | +| [`ChaosLayer`] | [rand] | Inject chaos into underlying services for robustness test. | +| [`ConcurrentLimitLayer`] | [tokio] | Add concurrent request limit. | +| [`DtraceLayer`] | [probe] | Support User Statically-Defined Tracing(aka USDT) on Linux | +| [`LoggingLayer`] | [log] | Add log for every operations. | +| [`MetricsLayer`] | [metrics] | Add metrics for every operations. | +| [`MimeGuessLayer`] | [mime_guess] | Add `Content-Type` automatically based on the file extension in the operation path. | +| [`FastraceLayer`] | [fastrace] | Add fastrace for every operations. | +| [`OtelMetricsLayer`] | [opentelemetry::metrics] | Add opentelemetry::metrics for every operations. | +| [`OtelTraceLayer`] | [opentelemetry::trace] | Add opentelemetry::trace for every operations. | +| [`PrometheusClientLayer`] | [prometheus_client] | Add prometheus metrics for every operations. | +| [`PrometheusLayer`] | [prometheus] | Add prometheus metrics for every operations. | +| [`RetryLayer`] | [backon] | Add retry for temporary failed operations. | +| [`ThrottleLayer`] | [governor] | Add a bandwidth rate limiter to the underlying services. | +| [`TimeoutLayer`] | [tokio] | Add timeout for every operations to avoid slow or unexpected hang operations. | +| [`TracingLayer`] | [tracing] | Add tracing for every operations. | [`AsyncBacktraceLayer`]: https://docs.rs/opendal/latest/opendal/layers/struct.AsyncBacktraceLayer.html [async-backtrace]: https://github.com/tokio-rs/async-backtrace @@ -144,6 +145,7 @@ OpenDAL supports the following storage [layers](https://docs.rs/opendal/latest/o [mime_guess]: https://github.com/abonander/mime_guess [`FastraceLayer`]: https://docs.rs/opendal/latest/opendal/layers/struct.FastraceLayer.html [fastrace]: https://github.com/fastracelabs/fastrace +[`OtelMetricsLayer`]: https://docs.rs/opendal/latest/opendal/layers/struct.OtelMetricsLayer.html [`OtelTraceLayer`]: https://docs.rs/opendal/latest/opendal/layers/struct.OtelTraceLayer.html [opentelemetry::trace]: https://docs.rs/opentelemetry/latest/opentelemetry/trace/index.html [`PrometheusClientLayer`]: https://docs.rs/opendal/latest/opendal/layers/struct.PrometheusClientLayer.html diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 0ac8f5f67abe..0c7657c57d45 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -87,6 +87,11 @@ mod fastrace; #[cfg(feature = "layers-fastrace")] pub use self::fastrace::FastraceLayer; +#[cfg(feature = "layers-otel-metrics")] +mod otelmetrics; +#[cfg(feature = "layers-otel-metrics")] +pub use self::otelmetrics::OtelMetricsLayer; + #[cfg(feature = "layers-otel-trace")] mod oteltrace; #[cfg(feature = "layers-otel-trace")] diff --git a/core/src/layers/otelmetrics.rs b/core/src/layers/otelmetrics.rs new file mode 100644 index 000000000000..5b3d083a74a4 --- /dev/null +++ b/core/src/layers/otelmetrics.rs @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use std::time::Duration; + +use opentelemetry::global; +use opentelemetry::metrics::Counter; +use opentelemetry::metrics::Histogram; +use opentelemetry::KeyValue; + +use crate::layers::observe; +use crate::raw::*; +use crate::*; + +/// Add [opentelemetry::metrics](https://docs.rs/opentelemetry/latest/opentelemetry/metrics/index.html) for every operation. +/// +/// # Examples +/// +/// ```no_run +/// # use opendal::layers::OtelMetricsLayer; +/// # use opendal::services; +/// # use opendal::Operator; +/// # use opendal::Result; +/// +/// # fn main() -> Result<()> { +/// let _ = Operator::new(services::Memory::default())? +/// .layer(OtelMetricsLayer::builder().register()) +/// .finish(); +/// Ok(()) +/// # } +/// ``` +#[derive(Clone, Debug)] +pub struct OtelMetricsLayer { + interceptor: OtelMetricsInterceptor, +} + +impl OtelMetricsLayer { + /// Create a [`OtelMetricsLayerBuilder`] to set the configuration of metrics. + /// + /// # Default Configuration + /// + /// - `path_label`: `0` + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::OtelMetricsLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let op = Operator::new(services::Memory::default())? + /// .layer(OtelMetricsLayer::builder().path_label(1).register()) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn builder() -> OtelMetricsLayerBuilder { + OtelMetricsLayerBuilder::new() + } +} + +/// [`OtelMetricsLayerBuilder`] is a config builder to build a [`OtelMetricsLayer`]. +pub struct OtelMetricsLayerBuilder { + operation_duration_seconds_boundaries: Vec, + operation_bytes_boundaries: Vec, + path_label_level: usize, +} + +impl OtelMetricsLayerBuilder { + fn new() -> Self { + Self { + operation_duration_seconds_boundaries: exponential_boundary(0.01, 2.0, 16), + operation_bytes_boundaries: exponential_boundary(1.0, 2.0, 16), + path_label_level: 0, + } + } + + /// Set the level of path label. + /// + /// - level = 0: we will ignore the path label. + /// - level > 0: the path label will be the path split by "/" and get the last n level, + /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::OtelMetricsLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let op = Operator::new(services::Memory::default())? + /// .layer(OtelMetricsLayer::builder().path_label(1).register()) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn path_label(mut self, level: usize) -> Self { + self.path_label_level = level; + self + } + + /// Set boundaries for `operation_duration_seconds` histogram. + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::OtelMetricsLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let op = Operator::new(services::Memory::default())? + /// .layer( + /// OtelMetricsLayer::builder() + /// .operation_duration_seconds_boundaries(vec![0.01, 0.02, 0.05, 0.1, 0.2, 0.5]) + /// .register() + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn operation_duration_seconds_boundaries(mut self, boundaries: Vec) -> Self { + if !boundaries.is_empty() { + self.operation_duration_seconds_boundaries = boundaries; + } + self + } + + /// Set boundaries for `operation_bytes` histogram. + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::OtelMetricsLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let op = Operator::new(services::Memory::default())? + /// .layer( + /// OtelMetricsLayer::builder() + /// .operation_bytes_boundaries(vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0]) + /// .register() + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn operation_bytes_boundaries(mut self, boundaries: Vec) -> Self { + if !boundaries.is_empty() { + self.operation_bytes_boundaries = boundaries; + } + self + } + + /// Register the metrics and return a [`OtelMetricsLayer`]. + /// + /// # Examples + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::OtelMetricsLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let op = Operator::new(services::Memory::default())? + /// .layer(OtelMetricsLayer::builder().register()) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn register(self) -> OtelMetricsLayer { + let meter = global::meter("opendal"); + let duration_seconds = meter + .f64_histogram("opendal.operation.duration") + .with_description("Duration of operations") + .with_unit("second") + .with_boundaries(self.operation_duration_seconds_boundaries) + .build(); + let bytes = meter + .u64_histogram("opendal.operation.size") + .with_description("Size of operations") + .with_unit("byte") + .with_boundaries(self.operation_bytes_boundaries) + .build(); + let errors = meter + .u64_counter("opendal.operation.errors") + .with_description("Number of operation errors") + .build(); + + OtelMetricsLayer { + interceptor: OtelMetricsInterceptor { + duration_seconds, + bytes, + errors, + path_label_level: self.path_label_level, + }, + } + } +} + +impl Layer for OtelMetricsLayer { + type LayeredAccess = observe::MetricsAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) + } +} + +#[derive(Clone, Debug)] +pub struct OtelMetricsInterceptor { + duration_seconds: Histogram, + bytes: Histogram, + errors: Counter, + path_label_level: usize, +} + +impl observe::MetricsIntercept for OtelMetricsInterceptor { + fn observe_operation_duration_seconds( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + duration: Duration, + ) { + let attributes = self.create_attributes(scheme, namespace, root, path, op, None); + self.duration_seconds + .record(duration.as_secs_f64(), &attributes); + } + + fn observe_operation_bytes( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + bytes: usize, + ) { + let attributes = self.create_attributes(scheme, namespace, root, path, op, None); + self.bytes.record(bytes as u64, &attributes); + } + + fn observe_operation_errors_total( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + error: ErrorKind, + ) { + let attributes = self.create_attributes(scheme, namespace, root, path, op, Some(error)); + self.errors.add(1, &attributes); + } +} + +impl OtelMetricsInterceptor { + fn create_attributes( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + operation: Operation, + error: Option, + ) -> Vec { + let mut attributes = Vec::with_capacity(6); + + attributes.extend([ + KeyValue::new(observe::LABEL_SCHEME, scheme.into_static()), + KeyValue::new(observe::LABEL_NAMESPACE, (*namespace).clone()), + KeyValue::new(observe::LABEL_ROOT, (*root).clone()), + KeyValue::new(observe::LABEL_OPERATION, operation.into_static()), + ]); + + if let Some(path) = observe::path_label_value(path, self.path_label_level) { + attributes.push(KeyValue::new(observe::LABEL_PATH, path.to_owned())); + } + + if let Some(error) = error { + attributes.push(KeyValue::new(observe::LABEL_ERROR, error.into_static())); + } + + attributes + } +} + +fn exponential_boundary(start: f64, factor: f64, count: usize) -> Vec { + let mut boundaries = Vec::with_capacity(count); + let mut current = start; + for _ in 0..count { + boundaries.push(current); + current *= factor; + } + boundaries +} diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 2a33c0c2a29e..efc1380db420 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -59,13 +59,12 @@ use crate::*; /// /// # fn main() -> Result<()> { /// use opentelemetry::trace::TracerProvider; -/// let tracer_provider = opentelemetry_otlp::new_pipeline() -/// .tracing() -/// .with_exporter(opentelemetry_otlp::new_exporter().tonic()) -/// .with_trace_config(trace::Config::default().with_resource(Resource::new(vec![ +/// let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder() +/// .with_simple_exporter(opentelemetry_otlp::SpanExporter::builder().with_tonic().build()?) +/// .with_resource(Resource::new(vec![ /// KeyValue::new("service.name", "opendal_example"), -/// ]))) -/// .install_simple()?; +/// ])) +/// .build(); /// let tracer = tracer_provider.tracer("opendal_tracer"); /// let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); ///