Skip to content

Commit

Permalink
Merge branch 'main' into global-error-handler-cleanup-logs-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Oct 13, 2024
2 parents f695605 + bacb3da commit e99195b
Show file tree
Hide file tree
Showing 22 changed files with 384 additions and 151 deletions.
1 change: 1 addition & 0 deletions opentelemetry-appender-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tracing-subscriber = { workspace = true, features = ["registry", "std", "env-fil
tracing-log = "0.2"
async-trait = { workspace = true }
criterion = { workspace = true }
tokio = { workspace = true, features = ["full"]}

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
Expand Down
73 changes: 70 additions & 3 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,20 @@ const fn severity_of_level(level: &Level) -> Severity {
#[cfg(test)]
mod tests {
use crate::layer;
use opentelemetry::logs::Severity;
use async_trait::async_trait;
use opentelemetry::logs::{LogResult, Severity};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer};
use opentelemetry::{logs::AnyValue, Key};
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogRecord, LoggerProvider};
use opentelemetry_sdk::testing::logs::InMemoryLogsExporter;
use opentelemetry_sdk::trace;
use opentelemetry_sdk::trace::{Sampler, TracerProvider};
use tracing::error;
use tracing::{error, warn};
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::Layer;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};

pub fn attributes_contains(log_record: &LogRecord, key: &Key, value: &AnyValue) -> bool {
log_record
Expand All @@ -238,6 +241,70 @@ mod tests {
}

// cargo test --features=testing

#[derive(Clone, Debug, Default)]
struct ReentrantLogExporter;

#[async_trait]
impl LogExporter for ReentrantLogExporter {
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Ok(())
}
}

#[test]
#[ignore = "See issue: https://github.com/open-telemetry/opentelemetry-rust/issues/1745"]
fn simple_processor_deadlock() {
let exporter: ReentrantLogExporter = ReentrantLogExporter;
let logger_provider = LoggerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);

// Setting subscriber as global as that is the only way to test this scenario.
tracing_subscriber::registry().with(layer).init();
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
}

#[test]
#[ignore = "While this test runs fine, this uses global subscriber and does not play well with other tests."]
fn simple_processor_no_deadlock() {
let exporter: ReentrantLogExporter = ReentrantLogExporter;
let logger_provider = LoggerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);

// This filter will prevent the deadlock as the reentrant log will be
// ignored.
let filter = EnvFilter::new("debug").add_directive("reentrant=error".parse().unwrap());
// Setting subscriber as global as that is the only way to test this scenario.
tracing_subscriber::registry()
.with(filter)
.with(layer)
.init();
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "While this test runs fine, this uses global subscriber and does not play well with other tests."]
async fn batch_processor_no_deadlock() {
let exporter: ReentrantLogExporter = ReentrantLogExporter;
let logger_provider = LoggerProvider::builder()
.with_batch_exporter(exporter.clone(), opentelemetry_sdk::runtime::Tokio)
.build();

let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);

tracing_subscriber::registry().with(layer).init();
warn!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "[email protected]");
}

#[test]
fn tracing_appender_standalone() {
// Arrange
Expand Down
44 changes: 16 additions & 28 deletions opentelemetry-proto/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub(crate) mod serializers {
use crate::tonic::common::v1::any_value::{self, Value};
use crate::tonic::common::v1::AnyValue;
use serde::de::{self, MapAccess, Visitor};
use serde::ser::SerializeStruct;
use serde::ser::{SerializeMap, SerializeStruct};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;

Expand Down Expand Up @@ -45,35 +45,23 @@ pub(crate) mod serializers {
}

// AnyValue <-> KeyValue conversion
pub fn serialize_to_value<S>(value: &Option<AnyValue>, serializer: S) -> Result<S::Ok, S::Error>
pub fn serialize_to_value<S>(value: &Option<Value>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match value {
Some(any_value) => match &any_value.value {
Some(Value::IntValue(i)) => {
// Attempt to create a struct to wrap the intValue
let mut state = match serializer.serialize_struct("Value", 1) {
Ok(s) => s,
Err(e) => return Err(e), // Handle the error or return it
};

// Attempt to serialize the intValue field
if let Err(e) = state.serialize_field("intValue", &i.to_string()) {
return Err(e); // Handle the error or return it
}

// Finalize the struct serialization
state.end()
}
Some(value) => value.serialize(serializer),
None => serializer.serialize_none(),
},
match &value {
Some(Value::IntValue(i)) => {
// Attempt to serialize the intValue field
let mut map = serializer.serialize_map(Some(1))?;
map.serialize_entry("intValue", &i.to_string());
map.end()
}
Some(value) => value.serialize(serializer),
None => serializer.serialize_none(),
}
}

pub fn deserialize_from_value<'de, D>(deserializer: D) -> Result<Option<AnyValue>, D::Error>
pub fn deserialize_from_value<'de, D>(deserializer: D) -> Result<Option<Value>, D::Error>
where
D: Deserializer<'de>,
{
Expand All @@ -99,13 +87,13 @@ pub(crate) mod serializers {
}

impl<'de> de::Visitor<'de> for ValueVisitor {
type Value = AnyValue;
type Value = Option<Value>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a JSON object for AnyValue")
}

fn visit_map<V>(self, mut map: V) -> Result<AnyValue, V::Error>
fn visit_map<V>(self, mut map: V) -> Result<Option<Value>, V::Error>
where
V: de::MapAccess<'de>,
{
Expand Down Expand Up @@ -150,17 +138,17 @@ pub(crate) mod serializers {
}

if let Some(v) = value {
Ok(AnyValue { value: Some(v) })
Ok(Some(v))
} else {
Err(de::Error::custom(
"Invalid data for AnyValue, no known keys found",
"Invalid data for Value, no known keys found",
))
}
}
}

let value = deserializer.deserialize_map(ValueVisitor)?;
Ok(Some(value))
Ok(value)
}

pub fn serialize_u64_to_string<S>(value: &u64, serializer: S) -> Result<S::Ok, S::Error>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ pub struct AnyValue {
/// The value is one of the listed fields. It is valid for all values to be unspecified
/// in which case this AnyValue is considered to be "empty".
#[prost(oneof = "any_value::Value", tags = "1, 2, 3, 4, 5, 6, 7")]
#[cfg_attr(
feature = "with-serde",
serde(
flatten,
serialize_with = "crate::proto::serializers::serialize_to_value",
deserialize_with = "crate::proto::serializers::deserialize_from_value"
)
)]
pub value: ::core::option::Option<any_value::Value>,
}
/// Nested message and enum types in `AnyValue`.
Expand Down Expand Up @@ -75,13 +83,6 @@ pub struct KeyValue {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
#[prost(message, optional, tag = "2")]
#[cfg_attr(
feature = "with-serde",
serde(
serialize_with = "crate::proto::serializers::serialize_to_value",
deserialize_with = "crate::proto::serializers::deserialize_from_value"
)
)]
pub value: ::core::option::Option<AnyValue>,
}
/// InstrumentationScope is a message representing the instrumentation scope information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,6 @@ pub struct LogRecord {
/// string message (including multi-line) describing the event in a free form or it can
/// be a structured data composed of arrays and maps of other values. \[Optional\].
#[prost(message, optional, tag = "5")]
#[cfg_attr(
feature = "with-serde",
serde(
serialize_with = "crate::proto::serializers::serialize_to_value",
deserialize_with = "crate::proto::serializers::deserialize_from_value"
)
)]
pub body: ::core::option::Option<super::super::common::v1::AnyValue>,
/// Additional attributes that describe the specific event occurrence. \[Optional\].
/// Attribute keys MUST be unique (it is not allowed to have more than one
Expand Down
9 changes: 4 additions & 5 deletions opentelemetry-proto/tests/grpc_build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,10 @@ fn build_tonic() {
.field_attribute(path, "#[cfg_attr(feature = \"with-serde\", serde(serialize_with = \"crate::proto::serializers::serialize_u64_to_string\", deserialize_with = \"crate::proto::serializers::deserialize_string_to_u64\"))]")
}

// add custom serializer and deserializer for AnyValue
for path in ["common.v1.KeyValue.value", "logs.v1.LogRecord.body"] {
builder = builder
.field_attribute(path, "#[cfg_attr(feature =\"with-serde\", serde(serialize_with = \"crate::proto::serializers::serialize_to_value\", deserialize_with = \"crate::proto::serializers::deserialize_from_value\"))]");
}
// special serializer and deserializer for value
// The Value::value field must be hidden
builder = builder
.field_attribute("common.v1.AnyValue.value", "#[cfg_attr(feature =\"with-serde\", serde(flatten, serialize_with = \"crate::proto::serializers::serialize_to_value\", deserialize_with = \"crate::proto::serializers::deserialize_from_value\"))]");

// flatten
for path in ["metrics.v1.Metric.data", "metrics.v1.NumberDataPoint.value"] {
Expand Down
32 changes: 8 additions & 24 deletions opentelemetry-proto/tests/json_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,14 +518,10 @@ mod json_serde {
"arrayValue": {
"values": [
{
"value": {
"stringValue": "foo"
}
"stringValue": "foo"
},
{
"value": {
"stringValue": "bar"
}
"stringValue": "bar"
}
]
}
Expand Down Expand Up @@ -557,14 +553,10 @@ mod json_serde {
"arrayValue": {
"values": [
{
"value": {
"stringValue": "foo"
}
"stringValue": "foo"
},
{
"value": {
"intValue": 1337
}
"intValue": "1337"
}
]
}
Expand Down Expand Up @@ -1339,14 +1331,10 @@ mod json_serde {
"arrayValue": {
"values": [
{
"value": {
"stringValue": "many"
}
"stringValue": "many"
},
{
"value": {
"stringValue": "values"
}
"stringValue": "values"
}
]
}
Expand Down Expand Up @@ -1453,14 +1441,10 @@ mod json_serde {
"arrayValue": {
"values": [
{
"value": {
"stringValue": "many"
}
"stringValue": "many"
},
{
"value": {
"stringValue": "values"
}
"stringValue": "values"
}
]
}
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Bump MSRV to 1.70 [#2179](https://github.com/open-telemetry/opentelemetry-rust/pull/2179)
- Implement `LogRecord::set_trace_context` for `LogRecord`. Respect any trace context set on a `LogRecord` when emitting through a `Logger`.
- Improved `LoggerProvider` shutdown handling to prevent redundant shutdown calls when `drop` is invoked. [#2195](https://github.com/open-telemetry/opentelemetry-rust/pull/2195)

## v0.26.0
Released 2024-Sep-30
Expand Down
Loading

0 comments on commit e99195b

Please sign in to comment.