Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed May 24, 2024
1 parent 106ce5d commit 34f5e97
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 26 deletions.
10 changes: 7 additions & 3 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,16 @@ impl OtlpHttpClient {
fn build_trace_export_body(
&self,
spans: Vec<SpanData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,

Check warning on line 310 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L310

Added line #L310 was not covered by tests
) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;

let req = ExportTraceServiceRequest {
resource_spans: spans.into_iter().map(Into::into).collect(),
};
let resource_spans = spans
.into_iter()
.map(|log_event| (log_event, resource).into())
.collect::<Vec<_>>();

let req = ExportTraceServiceRequest { resource_spans };

Check warning on line 319 in opentelemetry-otlp/src/exporter/http/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/mod.rs#L314-L319

Added lines #L314 - L319 were not covered by tests
match self.protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient {
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let (body, content_type) = match self.build_trace_export_body(batch) {
let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) {

Check warning on line 24 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L24

Added line #L24 was not covered by tests
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
Expand Down Expand Up @@ -66,4 +66,8 @@ impl SpanExporter for OtlpHttpClient {
fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}

Check warning on line 72 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L70-L72

Added lines #L70 - L72 were not covered by tests
}
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::BoxInterceptor;
pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

Expand Down
21 changes: 18 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use super::BoxInterceptor;

pub(crate) struct TonicTracesClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -43,6 +46,7 @@ impl TonicTracesClient {
client,
interceptor,
}),
resource: Default::default(),
}
}
}
Expand All @@ -66,14 +70,21 @@ impl SpanExporter for TonicTracesClient {
}
};

// TODO: Avoid cloning here.
let resource_spans = {
batch
.into_iter()
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
};

Box::pin(async move {
client
.export(Request::from_parts(
metadata,
extensions,
ExportTraceServiceRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
},
ExportTraceServiceRequest { resource_spans },
))
.await
.map_err(crate::Error::from)?;
Expand All @@ -85,4 +96,8 @@ impl SpanExporter for TonicTracesClient {
fn shutdown(&mut self) {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
4 changes: 4 additions & 0 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,8 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
self.0.export(batch)
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.0.set_resource(resource);
}
}
14 changes: 5 additions & 9 deletions opentelemetry-proto/src/transform/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod tonic {
use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status};
use crate::transform::common::{
to_nanos,
tonic::{resource_attributes, Attributes},
tonic::{Attributes, ResourceAttributesWithSchema},
};
use opentelemetry::trace;
use opentelemetry::trace::{Link, SpanId, SpanKind};
Expand Down Expand Up @@ -45,19 +45,15 @@ pub mod tonic {
}
}

impl From<SpanData> for ResourceSpans {
fn from(source_span: SpanData) -> Self {
impl From<(SpanData, &ResourceAttributesWithSchema)> for ResourceSpans {
fn from((source_span, resource): (SpanData, &ResourceAttributesWithSchema)) -> Self {
let span_kind: span::SpanKind = source_span.span_kind.into();
ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes(&source_span.resource).0,
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
schema_url: source_span
.resource
.schema_url()
.map(|url| url.to_string())
.unwrap_or_default(),
schema_url: resource.schema_url.clone().unwrap_or_default(),
scope_spans: vec![ScopeSpans {
schema_url: source_span
.instrumentation_lib
Expand Down
13 changes: 12 additions & 1 deletion opentelemetry-stdout/src/trace/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use opentelemetry_sdk::export::{self, trace::ExportResult};
use std::io::{stdout, Write};

use crate::trace::transform::SpanData;
use opentelemetry_sdk::resource::Resource;

type Encoder = Box<dyn Fn(&mut dyn Write, SpanData) -> TraceResult<()> + Send + Sync>;

/// An OpenTelemetry exporter that writes to stdout on export.
pub struct SpanExporter {
writer: Option<Box<dyn Write + Send + Sync>>,
encoder: Encoder,
resource: Resource,
}

impl fmt::Debug for SpanExporter {
Expand All @@ -36,7 +38,11 @@ impl Default for SpanExporter {
impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
fn export(&mut self, batch: Vec<export::trace::SpanData>) -> BoxFuture<'static, ExportResult> {
let res = if let Some(writer) = &mut self.writer {
(self.encoder)(writer, crate::trace::SpanData::from(batch)).and_then(|_| {
(self.encoder)(
writer,
crate::trace::SpanData::from((batch, &self.resource)),
)
.and_then(|_| {

Check warning on line 45 in opentelemetry-stdout/src/trace/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/exporter.rs#L41-L45

Added lines #L41 - L45 were not covered by tests
writer
.write_all(b"\n")
.map_err(|err| TraceError::Other(Box::new(err)))
Expand All @@ -51,6 +57,10 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
fn shutdown(&mut self) {
self.writer.take();
}

fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
self.resource = res.clone();
}

Check warning on line 63 in opentelemetry-stdout/src/trace/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/exporter.rs#L61-L63

Added lines #L61 - L63 were not covered by tests
}

/// Configuration for the stdout trace exporter
Expand Down Expand Up @@ -107,6 +117,7 @@ impl SpanExporterBuilder {
pub fn build(self) -> SpanExporter {
SpanExporter {
writer: Some(self.writer.unwrap_or_else(|| Box::new(stdout()))),
resource: Resource::default(),

Check warning on line 120 in opentelemetry-stdout/src/trace/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/exporter.rs#L120

Added line #L120 was not covered by tests
encoder: self.encoder.unwrap_or_else(|| {
Box::new(|writer, spans| {
serde_json::to_writer(writer, &spans)
Expand Down
20 changes: 15 additions & 5 deletions opentelemetry-stdout/src/trace/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,27 @@ pub struct SpanData {
resource_spans: Vec<ResourceSpans>,
}

impl From<Vec<opentelemetry_sdk::export::trace::SpanData>> for SpanData {
fn from(sdk_spans: Vec<opentelemetry_sdk::export::trace::SpanData>) -> Self {
impl
From<(
Vec<opentelemetry_sdk::export::trace::SpanData>,
&opentelemetry_sdk::Resource,
)> for SpanData
{
fn from(
(sdk_spans, sdk_resource): (
Vec<opentelemetry_sdk::export::trace::SpanData>,
&opentelemetry_sdk::Resource,
),
) -> Self {

Check warning on line 23 in opentelemetry-stdout/src/trace/transform.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/transform.rs#L18-L23

Added lines #L18 - L23 were not covered by tests
let mut resource_spans = HashMap::<AttributeSet, ResourceSpans>::new();
for sdk_span in sdk_spans {
let resource_schema_url = sdk_span.resource.schema_url().map(|s| s.to_string().into());
let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into());

Check warning on line 26 in opentelemetry-stdout/src/trace/transform.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/transform.rs#L26

Added line #L26 was not covered by tests
let schema_url = sdk_span.instrumentation_lib.schema_url.clone();
let scope = sdk_span.instrumentation_lib.clone().into();
let resource = sdk_span.resource.as_ref().into();
let resource: Resource = sdk_resource.into();

Check warning on line 29 in opentelemetry-stdout/src/trace/transform.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/transform.rs#L29

Added line #L29 was not covered by tests

let rs = resource_spans
.entry(sdk_span.resource.as_ref().into())
.entry(sdk_resource.into())

Check warning on line 32 in opentelemetry-stdout/src/trace/transform.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/transform.rs#L32

Added line #L32 was not covered by tests
.or_insert_with(move || ResourceSpans {
resource,
scope_spans: Vec::with_capacity(1),
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-zipkin/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ impl trace::SpanExporter for Exporter {
self.local_endpoint.clone(),
))
}

fn set_resource(&mut self, _resource: &Resource) {}

Check warning on line 235 in opentelemetry-zipkin/src/exporter/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-zipkin/src/exporter/mod.rs#L235

Added line #L235 was not covered by tests
}

/// Wrap type for errors from opentelemetry zipkin
Expand Down
4 changes: 1 addition & 3 deletions opentelemetry-zipkin/src/exporter/model/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ mod tests {
use crate::exporter::model::span::{Kind, Span};
use crate::exporter::model::{into_zipkin_span, OTEL_ERROR_DESCRIPTION, OTEL_STATUS_CODE};
use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId};
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::trace::{SpanEvents, SpanLinks};
use opentelemetry_sdk::{export::trace::SpanData, Resource};
use std::borrow::Cow;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::time::SystemTime;
Expand Down Expand Up @@ -166,7 +165,6 @@ mod tests {
events: SpanEvents::default(),
links: SpanLinks::default(),
status,
resource: Cow::Owned(Resource::default()),
instrumentation_lib: Default::default(),
};
let local_endpoint = Endpoint::new("test".into(), None);
Expand Down
2 changes: 2 additions & 0 deletions stress/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ impl SpanProcessor for NoOpSpanProcessor {
fn shutdown(&mut self) -> TraceResult<()> {
Ok(())
}

fn set_resource(&mut self, _resource: &opentelemetry_sdk::Resource) {}
}

fn main() {
Expand Down

0 comments on commit 34f5e97

Please sign in to comment.