From 7b18b0187f9c64cf8e2e790e49ebcb71e457d2f2 Mon Sep 17 00:00:00 2001 From: Matthew Boddewyn <31598686+mattbodd@users.noreply.github.com> Date: Tue, 3 Dec 2024 20:25:13 -0800 Subject: [PATCH] Create ExportMetricsServiceRequest for each metric and replace the data for each data point --- opentelemetry-etw-metrics/src/exporter/mod.rs | 365 ++++++------------ 1 file changed, 117 insertions(+), 248 deletions(-) diff --git a/opentelemetry-etw-metrics/src/exporter/mod.rs b/opentelemetry-etw-metrics/src/exporter/mod.rs index f3508973..40639012 100644 --- a/opentelemetry-etw-metrics/src/exporter/mod.rs +++ b/opentelemetry-etw-metrics/src/exporter/mod.rs @@ -1,13 +1,16 @@ use opentelemetry::otel_warn; -use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; -use opentelemetry_sdk::metrics::{ - data::{ - self, ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, - ScopeMetrics, +use opentelemetry_proto::tonic::{ + collector::metrics::v1::ExportMetricsServiceRequest, + metrics::v1::{ + metric::Data as TonicMetricData, ExponentialHistogram as TonicExponentialHistogram, + Gauge as TonicGauge, Histogram as TonicHistogram, Metric as TonicMetric, + ResourceMetrics as TonicResourceMetrics, ScopeMetrics as TonicScopeMetrics, + Sum as TonicSum, Summary as TonicSummary, }, - exporter::PushMetricExporter, - MetricError, MetricResult, Temporality, +}; +use opentelemetry_sdk::metrics::{ + data::ResourceMetrics, exporter::PushMetricExporter, MetricError, MetricResult, Temporality, }; use prost::Message; @@ -39,19 +42,19 @@ impl Debug for MetricsExporter { } } -fn emit_metric(resource_metric: &ResourceMetrics, buffer: &mut Vec) -> MetricResult<()> { - // Zero the buffer to ensure no data is left over from previous writes - buffer.clear(); +fn emit_export_metric_service_request( + export_metric_service_request: &ExportMetricsServiceRequest, +) -> MetricResult<()> { + let mut encoding_buffer = Vec::new(); - let proto_message: ExportMetricsServiceRequest = (&*resource_metric).into(); - proto_message - .encode(buffer) + export_metric_service_request + .encode(&mut encoding_buffer) .map_err(|err| MetricError::Other(err.to_string()))?; - if (proto_message.encoded_len()) > etw::MAX_EVENT_SIZE { - otel_warn!(name: "MetricExportFailedDueToMaxSizeLimit", size = proto_message.encoded_len(), max_size = etw::MAX_EVENT_SIZE); + if (encoding_buffer.len()) > etw::MAX_EVENT_SIZE { + otel_warn!(name: "MetricExportFailedDueToMaxSizeLimit", size = encoding_buffer.len(), max_size = etw::MAX_EVENT_SIZE); } else { - let result = etw::write(&buffer); + let result = etw::write(&encoding_buffer); // TODO: Better logging/internal metrics needed here for non-failure // case Uncomment the line below to see the exported bytes until a // better logging solution is implemented @@ -67,243 +70,109 @@ fn emit_metric(resource_metric: &ResourceMetrics, buffer: &mut Vec) -> Metri #[async_trait] impl PushMetricExporter for MetricsExporter { async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { - let mut encoding_buffer: Vec = Vec::with_capacity(1024); + let schema_url: String = metrics + .resource + .schema_url() + .map(Into::into) + .unwrap_or_default(); for scope_metric in &metrics.scope_metrics { for metric in &scope_metric.metrics { - let data = &metric.data.as_any(); - if let Some(hist) = data.downcast_ref::>() { - for data_point in &hist.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Histogram { - temporality: hist.temporality, - data_points: vec![data_point.clone()], - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(hist) = data.downcast_ref::>() { - for data_point in &hist.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Histogram { - temporality: hist.temporality, - data_points: vec![data_point.clone()], - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(hist) = data.downcast_ref::>() { - for data_point in &hist.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::ExponentialHistogram { - temporality: hist.temporality, - data_points: vec![ExponentialHistogramDataPoint { - attributes: data_point.attributes.clone(), - count: data_point.count, - start_time: data_point.start_time, - time: data_point.time, - min: data_point.min, - max: data_point.max, - sum: data_point.sum, - scale: data_point.scale, - zero_count: data_point.zero_count, - zero_threshold: data_point.zero_threshold, - positive_bucket: ExponentialBucket { - offset: data_point.positive_bucket.offset, - counts: data_point.positive_bucket.counts.clone(), - }, - negative_bucket: ExponentialBucket { - offset: data_point.negative_bucket.offset, - counts: data_point.negative_bucket.counts.clone(), - }, - exemplars: data_point.exemplars.clone(), - }], - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(hist) = data.downcast_ref::>() { - for data_point in &hist.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::ExponentialHistogram { - temporality: hist.temporality, - data_points: vec![ExponentialHistogramDataPoint { - attributes: data_point.attributes.clone(), - count: data_point.count, - start_time: data_point.start_time, - time: data_point.time, - min: data_point.min, - max: data_point.max, - sum: data_point.sum, - scale: data_point.scale, - zero_count: data_point.zero_count, - zero_threshold: data_point.zero_threshold, - positive_bucket: ExponentialBucket { - offset: data_point.positive_bucket.offset, - counts: data_point.positive_bucket.counts.clone(), - }, - negative_bucket: ExponentialBucket { - offset: data_point.negative_bucket.offset, - counts: data_point.negative_bucket.counts.clone(), - }, - exemplars: data_point.exemplars.clone(), - }], - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(sum) = data.downcast_ref::>() { - for data_point in &sum.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Sum { - temporality: sum.temporality, - data_points: vec![data_point.clone()], - is_monotonic: sum.is_monotonic, - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(sum) = data.downcast_ref::>() { - for data_point in &sum.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Sum { - temporality: sum.temporality, - data_points: vec![data_point.clone()], - is_monotonic: sum.is_monotonic, - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(sum) = data.downcast_ref::>() { - for data_point in &sum.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Sum { - temporality: sum.temporality, - data_points: vec![data_point.clone()], - is_monotonic: sum.is_monotonic, - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(gauge) = data.downcast_ref::>() { - for data_point in &gauge.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Gauge { - data_points: vec![data_point.clone()], - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(gauge) = data.downcast_ref::>() { - for data_point in &gauge.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Gauge { - data_points: vec![data_point.clone()], - }), - }], - }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; - } - } else if let Some(gauge) = data.downcast_ref::>() { - for data_point in &gauge.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Gauge { - data_points: vec![data_point.clone()], - }), - }], + let proto_data: Option = metric.data.as_any().try_into().ok(); + + // This ExportMetricsServiceRequest is created for each metric and will hold a single data point. + let mut export_metrics_service_request = ExportMetricsServiceRequest { + resource_metrics: vec![TonicResourceMetrics { + resource: Some((&metrics.resource).into()), + scope_metrics: vec![TonicScopeMetrics { + scope: Some((&scope_metric.scope, None).into()), + metrics: vec![TonicMetric { + name: metric.name.to_string(), + description: metric.description.to_string(), + unit: metric.unit.to_string(), + metadata: vec![], + data: None, // Initially data is None, it will be set based on the type of metric }], - }; - emit_metric(&resource_metric, &mut encoding_buffer)?; + schema_url: schema_url.clone(), + }], + schema_url: schema_url.clone(), + }], + }; + + if let Some(proto_data) = proto_data { + match proto_data { + TonicMetricData::Histogram(hist) => { + for data_point in hist.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::Histogram(TonicHistogram { + aggregation_temporality: hist.aggregation_temporality, + data_points: vec![data_point], + })); + emit_export_metric_service_request( + &export_metrics_service_request, + )?; + } + } + TonicMetricData::ExponentialHistogram(exp_hist) => { + for data_point in exp_hist.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::ExponentialHistogram( + TonicExponentialHistogram { + aggregation_temporality: exp_hist.aggregation_temporality, + data_points: vec![data_point], + }, + )); + emit_export_metric_service_request( + &export_metrics_service_request, + )?; + } + } + TonicMetricData::Gauge(gauge) => { + for data_point in gauge.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::Gauge(TonicGauge { + data_points: vec![data_point], + })); + emit_export_metric_service_request( + &export_metrics_service_request, + )?; + } + } + TonicMetricData::Sum(sum) => { + for data_point in sum.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::Sum(TonicSum { + data_points: vec![data_point], + aggregation_temporality: sum.aggregation_temporality, + is_monotonic: sum.is_monotonic, + })); + emit_export_metric_service_request( + &export_metrics_service_request, + )?; + } + } + TonicMetricData::Summary(summary) => { + for data in summary.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::Summary(TonicSummary { + data_points: vec![data], + })); + emit_export_metric_service_request( + &export_metrics_service_request, + )?; + } + } } - } else { - otel_warn!(name: "MetricExportFailedDueToUnsupportedMetricType", metric_type = format!("{:?}", data)); } } }