Skip to content

Commit

Permalink
NEW: add new metadata to send span
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Sep 26, 2024
1 parent 1aadf72 commit e360f82
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 47 deletions.
105 changes: 76 additions & 29 deletions src/opentelemetry_instrumentation_kstreams/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from opentelemetry import propagate, context, trace
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import SpanAttributes

# Enable after 0.49 is released
# from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.trace.span import Span
from opentelemetry.context.context import Context
Expand All @@ -19,7 +22,6 @@

class KStreamsContextGetter(textmap.Getter[HeadersT]):
def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]:
print("Getting context!!!", carrier, key)
if carrier is None:
return None
carrier_items: Iterable = carrier
Expand All @@ -29,7 +31,6 @@ def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]:
for item_key, value in carrier_items:
if item_key == key:
if value is not None:
print("Found context, here you goooo", value)
return [value.decode()]
return None

Expand Down Expand Up @@ -88,10 +89,20 @@ def extract_send_topic(args, kwargs) -> Any:
)

@staticmethod
def extract_send_partition(args: Any, kwargs: Any) -> Any:
return KStreamsKafkaExtractor._extract_argument(
"partition", 4, None, args, kwargs
)
def extract_send_partition(record_metadata: Any) -> Any:
if hasattr(record_metadata, "partition"):
return record_metadata.partition

@staticmethod
def extract_send_offset(record_metadata: Any) -> Any:
if hasattr(record_metadata, "offset"):
return record_metadata.offset

@staticmethod
def extract_consumer_group(consumer: Any) -> Optional[str]:
if hasattr(consumer, "group_id"):
return consumer.group_id
return None

@staticmethod
def extract_producer_client_id(instance: StreamEngine) -> Optional[str]:
Expand All @@ -106,14 +117,12 @@ def extract_consumer_client_id(instance: Stream) -> str:
return instance.consumer._client._client_id


def _enrich_span(
def _enrich_base_span(
span: Span,
bootstrap_servers: List[str],
topic: str,
partition: Optional[int],
client_id: Optional[str],
offset: Optional[int] = None,
):
) -> None:
"""
Enriches the given span with Kafka-related attributes.
Expand All @@ -140,24 +149,40 @@ def _enrich_span(
if client_id is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_CLIENT_ID, client_id)

if span.is_recording():
if offset is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset)
if partition is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)
if offset and partition:
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
f"{topic}.{partition}.{offset}",
)

def _enrich_span_with_record_info(
span: Span, topic: str, partition: Optional[int], offset: Optional[int] = None
) -> None:
"""Used both by consumer and producer spans
It's in a different function because send needs to be injected with the span
info, but we want to be able to keep updating the span with the record info
def _get_span_name(operation: str, topic: str):
Args:
span: The span to enrich.
topic: The Kafka topic name.
partition: The partition number, if available.
offset: The message offset, if available. Defaults to None.
Returns:
None
"""
if offset is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset)
if partition is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)
if offset is not None and partition is not None:
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID,
f"{topic}.{partition}.{offset}",
)


def _get_span_name(operation: str, topic: str) -> str:
return f"{topic} {operation}"


def _wrap_send(tracer: Tracer) -> Callable:
async def _traced_send(func, instance: StreamEngine, args, kwargs):
async def _traced_send(func, instance: StreamEngine, args, kwargs) -> Any:
if not isinstance(instance.backend, Kafka):
raise NotImplementedError("Only Kafka backend is supported for now")

Expand All @@ -166,24 +191,27 @@ async def _traced_send(func, instance: StreamEngine, args, kwargs):
if headers is None:
headers = []
kwargs["headers"] = headers

topic = KStreamsKafkaExtractor.extract_send_topic(args, kwargs)
client_id = KStreamsKafkaExtractor.extract_producer_client_id(instance)
bootstrap_servers = KStreamsKafkaExtractor.extract_bootstrap_servers(
instance.backend
)
partition = KStreamsKafkaExtractor.extract_send_partition(args, kwargs)
client_id = KStreamsKafkaExtractor.extract_producer_client_id(instance)
topic = KStreamsKafkaExtractor.extract_send_topic(args, kwargs)

span_name = _get_span_name("send", topic)
with tracer.start_as_current_span(span_name, kind=SpanKind.PRODUCER) as span:
_enrich_span(span, bootstrap_servers, topic, partition, client_id)
_enrich_base_span(span, bootstrap_servers, topic, client_id)
propagate.inject(
headers,
context=trace.set_span_in_context(span),
setter=_kstreams_setter,
)
record_metadata = await func(*args, **kwargs)

partition = KStreamsKafkaExtractor.extract_send_partition(record_metadata)
offset = KStreamsKafkaExtractor.extract_send_offset(record_metadata)
_enrich_span_with_record_info(span, topic, partition, offset)

return await func(*args, **kwargs)
return record_metadata

return _traced_send

Expand All @@ -194,6 +222,7 @@ def _create_consumer_span(
extracted_context: Context,
bootstrap_servers: List[str],
client_id: str,
consumer_group: Optional[str],
args: Any,
kwargs: Any,
) -> None:
Expand All @@ -220,7 +249,20 @@ def _create_consumer_span(
) as span:
new_context = trace.set_span_in_context(span, extracted_context)
token = context.attach(new_context)
_enrich_span(span, bootstrap_servers, record.topic, record.partition, client_id, record.offset)
_enrich_base_span(
span,
bootstrap_servers,
record.topic,
client_id,
)
_enrich_span_with_record_info(
span, record.topic, record.partition, record.offset
)
# TODO: enable after 0.49 is released
# if consumer_group is not None:
# span.set_attribute(
# SpanAttributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
# )

context.detach(token)

Expand All @@ -236,6 +278,10 @@ async def _traced_anext(func, instance: Stream, args, kwargs):
instance.backend
)
client_id = KStreamsKafkaExtractor.extract_consumer_client_id(instance)
# consumer_group = KStreamsKafkaExtractor.extract_consumer_group(
# instance.consumer
# )
consumer_group = None
extracted_context: Context = propagate.extract(
record.headers, getter=_kstreams_getter
)
Expand All @@ -246,6 +292,7 @@ async def _traced_anext(func, instance: Stream, args, kwargs):
extracted_context,
bootstrap_servers,
client_id,
consumer_group,
args,
kwargs,
)
Expand Down
38 changes: 20 additions & 18 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from unittest import TestCase, mock

from opentelemetry_instrumentation_kstreams.utils import (
KStreamsKafkaExtractor,
_create_consumer_span,
_get_span_name,
_kstreams_getter,
Expand All @@ -43,7 +42,7 @@ def setUp(self) -> None:
@mock.patch(
"opentelemetry_instrumentation_kstreams.utils.KStreamsKafkaExtractor.extract_send_partition"
)
@mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_span")
@mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_base_span")
@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.propagate.inject")
def test_wrap_send_with_topic_as_arg(
Expand All @@ -68,7 +67,7 @@ def test_wrap_send_with_topic_as_arg(
@mock.patch(
"opentelemetry_instrumentation_kstreams.utils.KStreamsKafkaExtractor.extract_send_partition"
)
@mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_span")
@mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_base_span")
@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.propagate.inject")
def test_wrap_send_with_topic_as_kwarg(
Expand Down Expand Up @@ -98,19 +97,22 @@ def wrap_send_helper(
extract_bootstrap_servers: mock.MagicMock,
) -> None:
tracer = mock.MagicMock()
record = mock.MagicMock()
original_send_callback = mock.AsyncMock()
original_send_callback.return_value = record
stream_engine = mock.MagicMock()
stream_engine.backend = mock.MagicMock(spec_set=Kafka())
client_id = "client_id"
stream_engine._producer.client._client_id = client_id
expected_span_name = _get_span_name("send", self.topic_name)
wrapped_send = _wrap_send(tracer)

retval = asyncio.run(
wrapped_send(original_send_callback, stream_engine, self.args, self.kwargs)
)

extract_bootstrap_servers.assert_called_once_with(stream_engine.backend)
extract_send_partition.assert_called_once_with(self.args, self.kwargs)
extract_send_partition.assert_called_once_with(record)
tracer.start_as_current_span.assert_called_once_with(
expected_span_name, kind=SpanKind.PRODUCER
)
Expand All @@ -120,7 +122,6 @@ def wrap_send_helper(
span,
extract_bootstrap_servers.return_value,
self.topic_name,
extract_send_partition.return_value,
client_id,
)

Expand Down Expand Up @@ -174,12 +175,16 @@ async def test_wrap_next(

@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.context.attach")
@mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_span")
@mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_base_span")
@mock.patch(
"opentelemetry_instrumentation_kstreams.utils._enrich_span_with_record_info"
)
@mock.patch("opentelemetry.context.detach")
def test_create_consumer_span(
self,
detach: mock.MagicMock,
enrich_span: mock.MagicMock,
enrich_span_with_record_info: mock.MagicMock,
enrich_base_span: mock.MagicMock,
attach: mock.MagicMock,
set_span_in_context: mock.MagicMock,
) -> None:
Expand All @@ -196,6 +201,7 @@ def test_create_consumer_span(
extracted_context,
bootstrap_servers,
client_id,
None, # TODO: wait for 0.49 to be released
self.args,
self.kwargs,
)
Expand All @@ -211,22 +217,18 @@ def test_create_consumer_span(
set_span_in_context.assert_called_once_with(span, extracted_context)
attach.assert_called_once_with(set_span_in_context.return_value)

enrich_span.assert_called_once_with(
enrich_base_span.assert_called_once_with(
span,
bootstrap_servers,
record.topic,
record.partition,
client_id,
)

enrich_span_with_record_info.assert_called_once_with(
span,
record.topic,
record.partition,
record.offset,
)
# consume_hook.assert_called_once_with(span, record, self.args, self.kwargs)
detach.assert_called_once_with(attach.return_value)

# @mock.patch("opentelemetry_instrumentation_kstreams.utils.KStreamsKafkaExtractor")
def test_kafka_properties_extractor(
self,
# kafka_properties_extractor: mock.MagicMock,
):
assert (
KStreamsKafkaExtractor.extract_send_partition(self.args, self.kwargs) == 0
)

0 comments on commit e360f82

Please sign in to comment.