Skip to content

Commit

Permalink
Applying env-filter to OpenTelemetry span exporter (in Rust microserv…
Browse files Browse the repository at this point in the history
…ices) | Adding error handling for Elasticsearch response parsing
  • Loading branch information
Archisman-Mridha committed Feb 7, 2024
1 parent d770ef0 commit 07db74b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ futures = "0.3.30"
jsonwebtoken = "9.2.0"
kafka = { version = "0.10.0", default-features = false }
lazy_static = "1.4.0"
opentelemetry = "0.21.0"
opentelemetry = { version = "0.21.0", default-features = false, features = ["trace"] }
opentelemetry-otlp = { version = "0.14.0", default-features = false, features = ["gzip-tonic", "trace", "grpc-tonic"] }
opentelemetry_sdk = { version = "0.21.2", features = ["rt-tokio"] }
postgres-types = { version = "0.2.6", features = ["with-time-0_3"] }
Expand Down
17 changes: 11 additions & 6 deletions backend/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ pub mod utils {
};
use tokio::spawn;
use tracing::{
debug, debug_span, level_filters::LevelFilter, subscriber::set_global_default, warn, Span,
debug, info_span, level_filters::LevelFilter, subscriber::set_global_default, warn, Span,
};
use tracing_opentelemetry::{OpenTelemetryLayer, OpenTelemetrySpanExt};
use tracing_subscriber::fmt;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer, Registry};
use tracing_subscriber::{
layer::{Layered, SubscriberExt},
EnvFilter, Registry,
};
use warp::Filter;

pub fn setupObservability(serviceName: &'static str) {
Expand All @@ -102,8 +104,9 @@ pub mod utils {

set_global_default(
Registry::default()
.with(envFilter)
.with(startTracer(serviceName))
.with(fmt::layer().with_filter(envFilter)),
.with(tracing_subscriber::fmt::layer()),
)
.expect("ERROR : Setting up global default tracing registry");

Expand All @@ -127,7 +130,9 @@ pub mod utils {
}

// startTracer creates an OpenTelemetry tracing pipeline and sets the global tracer.
pub fn startTracer(serviceName: &'static str) -> OpenTelemetryLayer<Registry, Tracer> {
pub fn startTracer(
serviceName: &'static str,
) -> OpenTelemetryLayer<Layered<EnvFilter, Registry>, Tracer> {
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

let tracer = new_pipeline()
Expand Down Expand Up @@ -155,7 +160,7 @@ pub mod utils {

pub fn makeSpan(request: &Request<Body>) -> Span {
let headers = request.headers();
debug_span!(
info_span!(
"Incoming Request",
?headers,
trace_id = tracing::field::Empty
Expand Down
8 changes: 4 additions & 4 deletions backend/microservices/profiles/adapters/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,17 @@ impl ElasticsearchAdapter {
// deserializeQueryProfilesResponse takes in the response returned from Elasticsearch for the query
// done in ElasticsearchAdapter.searchProfiles and tries to deserialize it to Vec<ProfilePreview>.
async fn deserializeQueryProfilesResponse(response: Response) -> Result<Vec<ProfilePreview>> {
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
struct Response {
hits: Hits,
}

#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
struct Hits {
hits: Vec<Hit>,
}

#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
struct Hit {
_id: String,
_source: Source,
Expand All @@ -133,7 +133,7 @@ async fn deserializeQueryProfilesResponse(response: Response) -> Result<Vec<Prof
let response = response.bytes().await?.to_vec();
let response = from_utf8(&response)?;

let response: Response = serde_json::from_str(response).unwrap();
let response: Response = serde_json::from_str(response)?;

let profiles = response
.hits
Expand Down

0 comments on commit 07db74b

Please sign in to comment.