Skip to content

Commit

Permalink
Clean up start, end, drop
Browse files Browse the repository at this point in the history
Signed-off-by: Caleb Schoepp <[email protected]>
  • Loading branch information
calebschoepp committed Sep 13, 2024
1 parent d966e97 commit eb8dc27
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 32 deletions.
75 changes: 47 additions & 28 deletions crates/factor-observe/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ use crate::{GuestSpan, InstanceState};

#[async_trait]
impl tracer::Host for InstanceState {
// TODO(Caleb): Make this implicit logic make more sense (the indexmap seems wrong)
// TODO(Caleb): Properly implement this
async fn start(
&mut self,
name: String,
_options: Option<tracer::StartOptions>,
options: Option<tracer::StartOptions>,
) -> Result<Resource<tracer::Span>> {
let mut state = self.state.write().unwrap();
let options = options.unwrap_or_default();

if state.active_spans.is_empty() {
// Before we ever create any new spans make sure we track the original host span ID
if state.original_host_span_id.is_none() {
state.original_host_span_id = Some(
tracing::Span::current()
.context()
Expand All @@ -34,31 +34,47 @@ impl tracer::Host for InstanceState {
);
}

// TODO(Caleb): Make this cleaner
let parent_context = match state.active_spans.is_empty() {
true => tracing::Span::current().context(),
false => Context::new().with_remote_span_context(
state
// Get span's parent based on whether it's a new root and whether there are any active spans
let parent_context = match (options.new_root, state.active_spans.is_empty()) {
// Not a new root | Active spans -> Last active guest span is parent
(false, false) => {
let span_context = state
.guest_spans
.get(*state.active_spans.last().unwrap().1)
.get(*state.active_spans.last().unwrap())
.unwrap()
.inner
.span_context()
.clone(),
),
.clone();
Context::new().with_remote_span_context(span_context)
}
// Not a new root | No active spans -> Current host span is parent
(false, true) => tracing::Span::current().context(),
// New root | n/a -> No parent
(true, _) => Context::new(),
};

// Create the underlying opentelemetry span
let otel_span = self.tracer.start_with_context(name, &parent_context);

let span_id = otel_span.span_context().span_id().to_string();
let mut builder = self.tracer.span_builder(name);
if let Some(kind) = options.span_kind {
builder = builder.with_kind(kind.into());
}
if let Some(attributes) = options.attributes {
builder = builder.with_attributes(attributes.into_iter().map(Into::into));
}
if let Some(links) = options.links {
builder = builder.with_links(links.into_iter().map(Into::into).collect());
}
if let Some(timestamp) = options.timestamp {
builder = builder.with_start_time(timestamp);
}
let otel_span = builder.start_with_context(&self.tracer, &parent_context);

// Wrap it in a GuestSpan for our own bookkeeping purposes
let guest_span = GuestSpan { inner: otel_span };

// Put the GuestSpan in our resource table and push it to our stack of active spans
// Put the GuestSpan in our resource table and push it on to our stack of active spans
let resource_id = state.guest_spans.push(guest_span).unwrap();
state.active_spans.insert(span_id, resource_id);
state.active_spans.insert(resource_id);

Ok(Resource::new_own(resource_id))
}
Expand Down Expand Up @@ -203,30 +219,33 @@ impl tracer::HostSpan for InstanceState {
resource: Resource<tracer::Span>,
timestamp: Option<tracer::Datetime>,
) -> Result<()> {
if let Some(guest_span) = self
.state
.write()
.unwrap()
.guest_spans
.get_mut(resource.rep())
{
let mut state = self.state.write().unwrap();
if let Some(guest_span) = state.guest_spans.get_mut(resource.rep()) {
if let Some(timestamp) = timestamp {
guest_span.inner.end_with_timestamp(timestamp.into());
} else {
guest_span.inner.end();
}

// Remove the span from active_spans
state.active_spans.shift_remove(&resource.rep());

Ok(())
} else {
Err(anyhow!("BUG: cannot find resource in table"))
}
}

fn drop(&mut self, _: Resource<tracer::Span>) -> Result<()> {
// Dropping the resource automatically calls drop on the Span which ends itself with the current timestamp
fn drop(&mut self, resource: Resource<tracer::Span>) -> Result<()> {
// Dropping the resource automatically calls drop on the Span which ends itself with the
// current timestamp if the Span is not already ended.

// Ensure that the span has been removed from active_spans
let mut state = self.state.write().unwrap();
state.active_spans.shift_remove(&resource.rep());

Ok(())
}
}

// TODO(Caleb): Move the tests from integration.rs to here
// TODO(Caleb): Write tests somewhere for all the finicky type conversion stuff
// TODO(Caleb): Maybe introduce macro to reduce boilerplate of finding resource
9 changes: 5 additions & 4 deletions crates/factor-observe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod host;

use std::sync::{Arc, RwLock};

use indexmap::IndexMap;
use indexmap::IndexSet;
use opentelemetry::{
global::{self, BoxedTracer, ObjectSafeSpan},
trace::{SpanId, TraceContextExt},
Expand Down Expand Up @@ -78,7 +78,7 @@ pub(crate) struct State {
/// Only a reference ID to the guest span is held here. The actual guest span must be looked up
/// in the `guest_spans` table using the reference ID.
/// TODO(Caleb): Fix comment
pub(crate) active_spans: IndexMap<String, u32>,
pub(crate) active_spans: IndexSet<u32>,

/// Id of the last span emitted from within the host before entering the guest.
///
Expand Down Expand Up @@ -115,6 +115,7 @@ impl ObserveContext {
/// Make sure to mention this should only be called from an instrumented function in a factor.
/// Make sure this is called before any awaits
pub fn reparent_tracing_span(&self) {
// TODO(Caleb): Refactor to be similar to start
let state = if let Some(state) = self.state.as_ref() {
state.read().unwrap()
} else {
Expand All @@ -133,14 +134,14 @@ impl ObserveContext {
.span_id()
.eq(&original_host_span_id)
{
panic!("TODO This should not happen")
panic!("TODO(Caleb): This should not happen")
}
}

let parent_context = Context::new().with_remote_span_context(
state
.guest_spans
.get(*state.active_spans.last().unwrap().1)
.get(*state.active_spans.last().unwrap())
.unwrap()
.inner
.span_context()
Expand Down
35 changes: 35 additions & 0 deletions crates/world/src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,46 @@ mod observe {
}
}

impl From<tracer::SpanKind> for opentelemetry::trace::SpanKind {
fn from(kind: tracer::SpanKind) -> Self {
match kind {
tracer::SpanKind::Client => opentelemetry::trace::SpanKind::Client,
tracer::SpanKind::Server => opentelemetry::trace::SpanKind::Server,
tracer::SpanKind::Producer => opentelemetry::trace::SpanKind::Producer,
tracer::SpanKind::Consumer => opentelemetry::trace::SpanKind::Consumer,
tracer::SpanKind::Internal => opentelemetry::trace::SpanKind::Internal,
}
}
}

impl From<tracer::Link> for opentelemetry::trace::Link {
fn from(link: tracer::Link) -> Self {
Self::new(
link.span_context.into(),
link.attributes.into_iter().map(Into::into).collect(),
0,
)
}
}

impl From<tracer::Datetime> for SystemTime {
fn from(timestamp: tracer::Datetime) -> Self {
UNIX_EPOCH
+ Duration::from_secs(timestamp.seconds)
+ Duration::from_nanos(timestamp.nanoseconds as u64)
}
}

#[allow(clippy::derivable_impls)]
impl Default for tracer::StartOptions {
fn default() -> Self {
Self {
new_root: false,
span_kind: None,
attributes: None,
links: None,
timestamp: None,
}
}
}
}

0 comments on commit eb8dc27

Please sign in to comment.