Skip to content

Commit

Permalink
OTLP tonic metadata from env variable (#1377)
Browse files Browse the repository at this point in the history
Fixes #1336

As per the
[specs](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.25.0/specification/protocol/exporter.md#specifying-headers-via-environment-variables),
the custom headers for OTLP exporter can be specified through env
variables - `OTEL_EXPORTER_OTLP_HEADERS`,
`OTEL_EXPORTER_OTLP_TRACES_HEADERS`,
`OTEL_EXPORTER_OTLP_METRICS_HEADERS`.
This PR completes the work already done in PR
#1290 adding support for tonic metadata
To reproduce the same behavior as http exporter, the env-variable takes
precedence (as discussed in
open-telemetry/opentelemetry-rust-contrib#10)

* Move common code for http and tonic exporters in `exporter/mod.rs`
(function to parse header from string and test helper to run tests with
isolated env variables)
I wanted to minimize the changes but maybe it should be a good idea to
use a crate like https://crates.io/crates/temp-env for environment
related testing
  • Loading branch information
harscoet authored Nov 19, 2023
1 parent a70bb74 commit 9e2e3db
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 41 deletions.
1 change: 1 addition & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Add `grpcio` metrics exporter (#1202)
- Allow specifying OTLP HTTP headers from env variable (#1290)
- Support custom channels in topic exporters [#1335](https://github.com/open-telemetry/opentelemetry-rust/pull/1335)
- Allow specifying OTLP Tonic metadata from env variable (#1377)

### Changed

Expand Down
44 changes: 8 additions & 36 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use super::default_headers;
use super::{default_headers, parse_header_string};

#[cfg(feature = "metrics")]
mod metrics;
Expand Down Expand Up @@ -316,46 +316,18 @@ fn resolve_endpoint(

#[allow(clippy::mutable_key_type)] // http headers are not mutated
fn add_header_from_string(input: &str, headers: &mut HashMap<HeaderName, HeaderValue>) {
for pair in input.split_terminator(',') {
if pair.trim().is_empty() {
continue;
}
if let Some((k, v)) = pair.trim().split_once('=') {
if !k.trim().is_empty() && !v.trim().is_empty() {
if let (Ok(key), Ok(value)) = (
HeaderName::from_str(k.trim()),
HeaderValue::from_str(v.trim()),
) {
headers.insert(key, value);
}
}
}
}
headers.extend(parse_header_string(input).filter_map(|(key, value)| {
Some((
HeaderName::from_str(key).ok()?,
HeaderValue::from_str(value).ok()?,
))
}));
}

#[cfg(test)]
mod tests {
use crate::exporter::tests::run_env_test;
use crate::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
use std::sync::Mutex;

// Make sure env tests are not running concurrently
static ENV_LOCK: Mutex<()> = Mutex::new(());

fn run_env_test<T, F>(env_vars: T, f: F)
where
F: FnOnce(),
T: Into<Vec<(&'static str, &'static str)>>,
{
let _env_lock = ENV_LOCK.lock().expect("env test lock poisoned");
let env_vars = env_vars.into();
for (k, v) in env_vars.iter() {
std::env::set_var(k, v);
}
f();
for (k, _) in env_vars {
std::env::remove_var(k);
}
}

#[test]
fn test_append_signal_path_to_generic_env() {
Expand Down
75 changes: 75 additions & 0 deletions opentelemetry-otlp/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,78 @@ impl<B: HasExportConfig> WithExportConfig for B {
self
}
}

#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
fn parse_header_string(value: &str) -> impl Iterator<Item = (&str, &str)> {
value
.split_terminator(',')
.map(str::trim)
.filter_map(parse_header_key_value_string)
}

#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
fn parse_header_key_value_string(key_value_string: &str) -> Option<(&str, &str)> {
key_value_string
.split_once('=')
.map(|(key, value)| (key.trim(), value.trim()))
.filter(|(key, value)| !key.is_empty() && !value.is_empty())
}

#[cfg(test)]
#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
mod tests {
// Make sure env tests are not running concurrently
static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());

pub(crate) fn run_env_test<T, F>(env_vars: T, f: F)
where
F: FnOnce(),
T: Into<Vec<(&'static str, &'static str)>>,
{
let _env_lock = ENV_LOCK.lock().expect("env test lock poisoned");
let env_vars = env_vars.into();
for (k, v) in env_vars.iter() {
std::env::set_var(k, v);
}
f();
for (k, _) in env_vars {
std::env::remove_var(k);
}
}

#[test]
fn test_parse_header_string() {
let test_cases = vec![
// Format: (input_str, expected_headers)
("k1=v1", vec![("k1", "v1")]),
("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]),
("k1=v1=10,k2,k3", vec![("k1", "v1=10")]),
("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]),
];

for (input_str, expected_headers) in test_cases {
assert_eq!(
super::parse_header_string(input_str).collect::<Vec<_>>(),
expected_headers,
)
}
}

#[test]
fn test_parse_header_key_value_string() {
let test_cases = vec![
// Format: (input_str, expected_header)
("k1=v1", Some(("k1", "v1"))),
("", None),
("=v1", None),
("k1=", None),
];

for (input_str, expected_headers) in test_cases {
assert_eq!(
super::parse_header_key_value_string(input_str),
expected_headers,
)
}
}
}
108 changes: 105 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use std::env;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::time::Duration;

use http::{HeaderMap, HeaderName, HeaderValue};
use tonic::codec::CompressionEncoding;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::service::Interceptor;
use tonic::transport::Channel;
#[cfg(feature = "tls")]
use tonic::transport::ClientTlsConfig;

use super::default_headers;
use super::{default_headers, parse_header_string};
use crate::exporter::Compression;
use crate::{
ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_TIMEOUT,
OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT,
};

#[cfg(feature = "logs")]
Expand Down Expand Up @@ -213,11 +215,17 @@ impl TonicExporterBuilder {
signal_endpoint_path: &str,
signal_timeout_var: &str,
signal_compression_var: &str,
signal_headers_var: &str,
) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), crate::Error> {
let tonic_config = self.tonic_config;
let compression = resolve_compression(&tonic_config, signal_compression_var)?;

let metadata = tonic_config.metadata.unwrap_or_default();
let headers_from_env = parse_headers_from_env(signal_headers_var);
let metadata = merge_metadata_with_headers_from_env(
tonic_config.metadata.unwrap_or_default(),
headers_from_env,
);

let add_metadata = move |mut req: tonic::Request<()>| {
for key_and_value in metadata.iter() {
match key_and_value {
Expand Down Expand Up @@ -294,6 +302,7 @@ impl TonicExporterBuilder {
"/v1/logs",
crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
)?;

let client = TonicLogsClient::new(channel, interceptor, compression);
Expand All @@ -316,6 +325,7 @@ impl TonicExporterBuilder {
"/v1/metrics",
crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
)?;

let client = TonicMetricsClient::new(channel, interceptor, compression);
Expand All @@ -339,6 +349,7 @@ impl TonicExporterBuilder {
"/v1/traces",
crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
)?;

let client = TonicTracesClient::new(channel, interceptor, compression);
Expand All @@ -347,11 +358,44 @@ impl TonicExporterBuilder {
}
}

fn merge_metadata_with_headers_from_env(
metadata: MetadataMap,
headers_from_env: HeaderMap,
) -> MetadataMap {
if headers_from_env.is_empty() {
metadata
} else {
let mut existing_headers: HeaderMap = metadata.into_headers();
existing_headers.extend(headers_from_env);

MetadataMap::from_headers(existing_headers)
}
}

fn parse_headers_from_env(signal_headers_var: &str) -> HeaderMap {
env::var(signal_headers_var)
.or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
.map(|input| {
parse_header_string(&input)
.filter_map(|(key, value)| {
Some((
HeaderName::from_str(key).ok()?,
HeaderValue::from_str(value).ok()?,
))
})
.collect::<HeaderMap>()
})
.unwrap_or_default()
}

#[cfg(test)]
mod tests {
use crate::exporter::tests::run_env_test;
#[cfg(feature = "gzip-tonic")]
use crate::exporter::Compression;
use crate::TonicExporterBuilder;
use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
use http::{HeaderMap, HeaderName, HeaderValue};
use tonic::metadata::{MetadataMap, MetadataValue};

#[test]
Expand Down Expand Up @@ -393,4 +437,62 @@ mod tests {
let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
}

#[test]
fn test_parse_headers_from_env() {
run_env_test(
vec![
(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
(OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
],
|| {
assert_eq!(
super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS),
HeaderMap::from_iter([
(
HeaderName::from_static("k1"),
HeaderValue::from_static("v1")
),
(
HeaderName::from_static("k2"),
HeaderValue::from_static("v2")
),
])
);

assert_eq!(
super::parse_headers_from_env("EMPTY_ENV"),
HeaderMap::from_iter([(
HeaderName::from_static("k3"),
HeaderValue::from_static("v3")
)])
);
},
)
}

#[test]
fn test_merge_metadata_with_headers_from_env() {
run_env_test(
vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
|| {
let headers_from_env =
super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);

let mut metadata = MetadataMap::new();
metadata.insert("foo", "bar".parse().unwrap());
metadata.insert("k1", "v0".parse().unwrap());

let result =
super::merge_metadata_with_headers_from_env(metadata, headers_from_env);

assert_eq!(
result.get("foo").unwrap(),
MetadataValue::from_static("bar")
);
assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
},
);
}
}
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,12 @@ pub enum Error {
RequestFailed(#[from] opentelemetry_http::HttpError),

/// The provided value is invalid in HTTP headers.
#[cfg(feature = "http-proto")]
#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
#[error("http header value error {0}")]
InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),

/// The provided name is invalid in HTTP headers.
#[cfg(feature = "http-proto")]
#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
#[error("http header name error {0}")]
InvalidHeaderName(#[from] http::header::InvalidHeaderName),

Expand Down

0 comments on commit 9e2e3db

Please sign in to comment.