diff --git a/libs/telemetry/src/collector.rs b/libs/telemetry/src/collector.rs index 8273eae7179..2bed931624a 100644 --- a/libs/telemetry/src/collector.rs +++ b/libs/telemetry/src/collector.rs @@ -7,6 +7,7 @@ use serde::Serialize; use crate::id::{RequestId, SpanId}; use crate::models::{LogLevel, SpanKind}; +use crate::NextId; #[derive(Debug, Clone)] #[cfg_attr(test, derive(Serialize))] @@ -37,10 +38,10 @@ pub(crate) struct SpanBuilder { } impl SpanBuilder { - pub fn new(name: &'static str, id: impl Into, attrs_size_hint: usize) -> Self { + pub fn new(name: &'static str, attrs_size_hint: usize) -> Self { Self { request_id: None, - id: id.into(), + id: SpanId::next(), name: name.into(), start_time: SystemTime::now(), elapsed: ElapsedTimeCounter::start(), @@ -54,6 +55,10 @@ impl SpanBuilder { self.request_id } + pub fn span_id(&self) -> SpanId { + self.id + } + pub fn set_request_id(&mut self, request_id: RequestId) { self.request_id = Some(request_id); } @@ -74,10 +79,10 @@ impl SpanBuilder { self.links.push(link); } - pub fn end(self, parent_id: Option>) -> CollectedSpan { + pub fn end(self, parent_id: Option) -> CollectedSpan { CollectedSpan { id: self.id, - parent_id: parent_id.map(Into::into), + parent_id, name: self.name, start_time: self.start_time, duration: self.elapsed.elapsed_time(), diff --git a/libs/telemetry/src/exporter.rs b/libs/telemetry/src/exporter.rs index 7032999536b..7c5891e945a 100644 --- a/libs/telemetry/src/exporter.rs +++ b/libs/telemetry/src/exporter.rs @@ -272,6 +272,8 @@ impl AllowAttribute for InternalAttributesFilter { mod tests { use std::time::{Duration, SystemTime}; + use crate::NextId; + use super::*; use CaptureTarget::*; @@ -327,7 +329,7 @@ mod tests { let request_id = exporter.start_capturing(RequestId::next(), capture_all()).await; let span = CollectedSpan { - id: tracing::span::Id::from_u64(1).into(), + id: SpanId::try_from(1).unwrap(), parent_id: None, name: "test_span".into(), start_time: SystemTime::UNIX_EPOCH, @@ -381,7 +383,7 @@ mod tests { let request_id = exporter.start_capturing(RequestId::next(), capture_spans()).await; let span = CollectedSpan { - id: tracing::span::Id::from_u64(1).into(), + id: SpanId::try_from(1).unwrap(), parent_id: None, name: "test_span".into(), start_time: SystemTime::UNIX_EPOCH, diff --git a/libs/telemetry/src/id.rs b/libs/telemetry/src/id.rs index fce0b8ba305..b7ce6b6578b 100644 --- a/libs/telemetry/src/id.rs +++ b/libs/telemetry/src/id.rs @@ -16,9 +16,13 @@ impl SerializableNonZeroU64 { pub fn into_u64(self) -> u64 { self.0.get() } +} + +impl TryFrom for SerializableNonZeroU64 { + type Error = u64; - pub fn from_u64(value: u64) -> Option { - NonZeroU64::new(value).map(Self) + fn try_from(value: u64) -> Result { + NonZeroU64::new(value).map(Self).ok_or(value) } } @@ -66,24 +70,35 @@ impl<'de> Deserialize<'de> for SerializableNonZeroU64 { } } -/// A unique identifier for a span. It maps directly to [`tracing::span::Id`] assigned by -/// [`tracing_subscriber::registry::Registry`]. +/// A unique identifier for a span. +/// +/// We don't use the original span IDs assigned by the `tracing` `Subscriber` +/// because they are are only guaranteed to be unique among the spans active at +/// the same time. They may be reused after a span is closed (even for +/// successive sibling spans in the same trace as long as they don't overlap in +/// time), so they are ephemeral and cannot be stored. Since we do need to store +/// them and can only tolerate reuse across different traces but not in a single +/// trace, we generate our own IDs. #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] #[repr(transparent)] pub struct SpanId(SerializableNonZeroU64); -impl From<&tracing::span::Id> for SpanId { - fn from(id: &tracing::span::Id) -> Self { - Self(SerializableNonZeroU64(id.into_non_zero_u64())) +impl From for SpanId { + fn from(value: NonZeroU64) -> Self { + Self(SerializableNonZeroU64(value)) } } -impl From for SpanId { - fn from(id: tracing::span::Id) -> Self { - Self::from(&id) +impl TryFrom for SpanId { + type Error = u64; + + fn try_from(value: u64) -> Result { + SerializableNonZeroU64::try_from(value).map(Self) } } +impl NextId for SpanId {} + /// A unique identifier for an engine trace, representing a tree of spans. These /// internal traces *do not* correspond to OpenTelemetry trace IDs. One /// OpenTelemetry trace may contain multiple Prisma Client operations, each of @@ -91,39 +106,42 @@ impl From for SpanId { /// requests to the engine, we call these trace IDs "request IDs" to /// disambiguate and avoid confusion. /// -/// We don't use IDs of the root spans themselves for this purpose because span -/// IDs are only guaranteed to be unique among the spans active at the same -/// time. They may be reused after a span is closed, so they are not -/// historically unique. We store the collected spans and events for some short -/// time after the spans are closed until the client requests them, so we need -/// request IDs that are guaranteed to be unique for a very long period of time -/// (although they still don't necessarily have to be unique for the whole -/// lifetime of the process). +/// We store the collected spans and events for some short time after the spans +/// are closed until the client requests them, so we need request IDs that are +/// guaranteed to be unique for a very long period of time (although they still +/// don't necessarily have to be unique for the whole lifetime of the process). +/// +/// We don't use the root span IDs as the request IDs to have more flexibility +/// and allow clients to generate the request IDs on the client side, rather +/// than having us send the generated request ID back to the client. This +/// guarantees we can still fetch the traces from the engine even in a case of +/// an error and no response sent back to the client. #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] #[repr(transparent)] pub struct RequestId(SerializableNonZeroU64); impl RequestId { - pub fn next() -> Self { - static NEXT_ID: AtomicU64 = AtomicU64::new(1); - - let mut id = 0; - while id == 0 { - id = NEXT_ID.fetch_add(1, Ordering::Relaxed); - } - - Self(SerializableNonZeroU64(NonZeroU64::new(id).unwrap())) - } - pub fn into_u64(self) -> u64 { self.0.into_u64() } +} - pub fn from_u64(value: u64) -> Option { - SerializableNonZeroU64::from_u64(value).map(Self) +impl From for RequestId { + fn from(value: NonZeroU64) -> Self { + Self(SerializableNonZeroU64(value)) } } +impl TryFrom for RequestId { + type Error = u64; + + fn try_from(value: u64) -> Result { + SerializableNonZeroU64::try_from(value).map(Self) + } +} + +impl NextId for RequestId {} + impl Default for RequestId { fn default() -> Self { Self::next() @@ -137,3 +155,18 @@ impl FromStr for RequestId { SerializableNonZeroU64::from_str(s).map(Self) } } + +/// A trait for types that represent sequential IDs and can be losslessly +/// converted from [`NonZeroU64`]. +pub trait NextId: Sized + From { + fn next() -> Self { + static NEXT_ID: AtomicU64 = AtomicU64::new(1); + + let mut id = 0; + while id == 0 { + id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + } + + Self::from(NonZeroU64::new(id).unwrap()) + } +} diff --git a/libs/telemetry/src/layer.rs b/libs/telemetry/src/layer.rs index cc0afc1b5d7..64a2bb66b0d 100644 --- a/libs/telemetry/src/layer.rs +++ b/libs/telemetry/src/layer.rs @@ -72,7 +72,7 @@ where { fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { let span = Self::require_span(id, &ctx); - let mut span_builder = SpanBuilder::new(span.name(), id, attrs.fields().len()); + let mut span_builder = SpanBuilder::new(span.name(), attrs.fields().len()); if let Some(request_id) = span .parent() @@ -98,11 +98,16 @@ where } fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) { + let followed_span = Self::require_span(follows, &ctx); + let Some(followed_id) = followed_span.extensions().get::().map(|sb| sb.span_id()) else { + return; + }; + let span = Self::require_span(span, &ctx); let mut extensions = span.extensions_mut(); if let Some(span_builder) = extensions.get_mut::() { - span_builder.add_link(follows.into()); + span_builder.add_link(followed_id); } } @@ -116,12 +121,18 @@ where return; }; - let Some(request_id) = parent.extensions().get::().and_then(|sb| sb.request_id()) else { + let extensions = parent.extensions(); + + let Some(span_builder) = extensions.get::() else { + return; + }; + + let Some(request_id) = span_builder.request_id() else { return; }; let mut event_builder = EventBuilder::new( - parent.id().into(), + span_builder.span_id(), event.metadata().target(), event.metadata().level().into(), event.metadata().fields().len(), @@ -145,7 +156,10 @@ where return; }; - let parent_id = span.parent().map(|parent| parent.id()); + let parent_id = span + .parent() + .and_then(|parent| parent.extensions().get::().map(|sb| sb.span_id())); + let collected_span = span_builder.end(parent_id); self.collector.add_span(request_id, collected_span); @@ -182,7 +196,7 @@ impl field::Visit for SpanAttributeVisitor<'_, Filter> { fn record_u64(&mut self, field: &field::Field, value: u64) { match field.name() { REQUEST_ID_FIELD => { - if let Some(request_id) = RequestId::from_u64(value) { + if let Ok(request_id) = RequestId::try_from(value) { self.span_builder.set_request_id(request_id); } } @@ -279,7 +293,7 @@ impl field::Visit for EventAttributeVisitor<'_, Filter> #[cfg(test)] mod tests { use crate::collector::{AllowAttribute, CollectedEvent, CollectedSpan}; - use crate::id::RequestId; + use crate::id::{NextId, RequestId}; use super::*; diff --git a/libs/telemetry/src/lib.rs b/libs/telemetry/src/lib.rs index 0149243d6ac..8c85d3e086c 100644 --- a/libs/telemetry/src/lib.rs +++ b/libs/telemetry/src/lib.rs @@ -9,6 +9,6 @@ pub mod time; pub mod traceparent; pub use exporter::Exporter; -pub use id::RequestId; +pub use id::{NextId, RequestId}; pub use layer::layer; pub use traceparent::TraceParent; diff --git a/query-engine/query-engine/src/cli.rs b/query-engine/query-engine/src/cli.rs index d797b11e934..f2fbda908ec 100644 --- a/query-engine/query-engine/src/cli.rs +++ b/query-engine/query-engine/src/cli.rs @@ -9,7 +9,7 @@ use psl::parser_database::Files; use query_core::{protocol::EngineProtocol, schema}; use request_handlers::{dmmf, RequestBody, RequestHandler}; use std::{env, sync::Arc}; -use telemetry::RequestId; +use telemetry::{NextId, RequestId}; pub struct ExecuteRequest { query: String, diff --git a/query-engine/query-engine/src/context.rs b/query-engine/query-engine/src/context.rs index 2a0c8a0c48e..6155b8f3b46 100644 --- a/query-engine/query-engine/src/context.rs +++ b/query-engine/query-engine/src/context.rs @@ -12,7 +12,7 @@ use query_core::{ use request_handlers::{load_executor, ConnectorKind}; use std::{env, fmt, sync::Arc}; use telemetry::exporter::{CaptureSettings, CaptureTarget}; -use telemetry::RequestId; +use telemetry::{NextId, RequestId}; use tracing::Instrument; /// Prisma request context containing all immutable state of the process. diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index cf1e658f0a8..b0580fbf685 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -13,7 +13,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; use telemetry::exporter::{CaptureSettings, CaptureTarget, TraceData}; -use telemetry::{RequestId, TraceParent}; +use telemetry::{NextId, RequestId, TraceParent}; use tracing::{Instrument, Span}; /// Starts up the graphql query engine server diff --git a/query-engine/query-engine/src/tests/errors.rs b/query-engine/query-engine/src/tests/errors.rs index 3e3a524e8d7..b144b1100b3 100644 --- a/query-engine/query-engine/src/tests/errors.rs +++ b/query-engine/query-engine/src/tests/errors.rs @@ -3,7 +3,7 @@ use enumflags2::make_bitflags; use indoc::{formatdoc, indoc}; use query_core::protocol::EngineProtocol; use serde_json::json; -use telemetry::RequestId; +use telemetry::{NextId, RequestId}; #[tokio::test] async fn connection_string_problems_give_a_nice_error() {