From e360f8290e1ac613c6d7cd440435ece0276e95cb Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Thu, 26 Sep 2024 19:39:55 +0200 Subject: [PATCH] NEW: add new metadata to send span --- .../utils.py | 105 +++++++++++++----- tests/test_utils.py | 38 ++++--- 2 files changed, 96 insertions(+), 47 deletions(-) diff --git a/src/opentelemetry_instrumentation_kstreams/utils.py b/src/opentelemetry_instrumentation_kstreams/utils.py index 2322c27..99bdcf9 100644 --- a/src/opentelemetry_instrumentation_kstreams/utils.py +++ b/src/opentelemetry_instrumentation_kstreams/utils.py @@ -7,6 +7,9 @@ from opentelemetry import propagate, context, trace from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import SpanAttributes + +# Enable after 0.49 is released +# from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes from opentelemetry.trace import SpanKind, Tracer from opentelemetry.trace.span import Span from opentelemetry.context.context import Context @@ -19,7 +22,6 @@ class KStreamsContextGetter(textmap.Getter[HeadersT]): def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]: - print("Getting context!!!", carrier, key) if carrier is None: return None carrier_items: Iterable = carrier @@ -29,7 +31,6 @@ def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]: for item_key, value in carrier_items: if item_key == key: if value is not None: - print("Found context, here you goooo", value) return [value.decode()] return None @@ -88,10 +89,20 @@ def extract_send_topic(args, kwargs) -> Any: ) @staticmethod - def extract_send_partition(args: Any, kwargs: Any) -> Any: - return KStreamsKafkaExtractor._extract_argument( - "partition", 4, None, args, kwargs - ) + def extract_send_partition(record_metadata: Any) -> Any: + if hasattr(record_metadata, "partition"): + return record_metadata.partition + + @staticmethod + def extract_send_offset(record_metadata: Any) -> Any: + if hasattr(record_metadata, "offset"): + return record_metadata.offset + + @staticmethod + def extract_consumer_group(consumer: Any) -> Optional[str]: + if hasattr(consumer, "group_id"): + return consumer.group_id + return None @staticmethod def extract_producer_client_id(instance: StreamEngine) -> Optional[str]: @@ -106,14 +117,12 @@ def extract_consumer_client_id(instance: Stream) -> str: return instance.consumer._client._client_id -def _enrich_span( +def _enrich_base_span( span: Span, bootstrap_servers: List[str], topic: str, - partition: Optional[int], client_id: Optional[str], - offset: Optional[int] = None, -): +) -> None: """ Enriches the given span with Kafka-related attributes. @@ -140,24 +149,40 @@ def _enrich_span( if client_id is not None: span.set_attribute(SpanAttributes.MESSAGING_KAFKA_CLIENT_ID, client_id) - if span.is_recording(): - if offset is not None: - span.set_attribute(SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset) - if partition is not None: - span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) - if offset and partition: - span.set_attribute( - SpanAttributes.MESSAGING_MESSAGE_ID, - f"{topic}.{partition}.{offset}", - ) +def _enrich_span_with_record_info( + span: Span, topic: str, partition: Optional[int], offset: Optional[int] = None +) -> None: + """Used both by consumer and producer spans + + It's in a different function because send needs to be injected with the span + info, but we want to be able to keep updating the span with the record info -def _get_span_name(operation: str, topic: str): + Args: + span: The span to enrich. + topic: The Kafka topic name. + partition: The partition number, if available. + offset: The message offset, if available. Defaults to None. + Returns: + None + """ + if offset is not None: + span.set_attribute(SpanAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset) + if partition is not None: + span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) + if offset is not None and partition is not None: + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, + f"{topic}.{partition}.{offset}", + ) + + +def _get_span_name(operation: str, topic: str) -> str: return f"{topic} {operation}" def _wrap_send(tracer: Tracer) -> Callable: - async def _traced_send(func, instance: StreamEngine, args, kwargs): + async def _traced_send(func, instance: StreamEngine, args, kwargs) -> Any: if not isinstance(instance.backend, Kafka): raise NotImplementedError("Only Kafka backend is supported for now") @@ -166,24 +191,27 @@ async def _traced_send(func, instance: StreamEngine, args, kwargs): if headers is None: headers = [] kwargs["headers"] = headers - - topic = KStreamsKafkaExtractor.extract_send_topic(args, kwargs) + client_id = KStreamsKafkaExtractor.extract_producer_client_id(instance) bootstrap_servers = KStreamsKafkaExtractor.extract_bootstrap_servers( instance.backend ) - partition = KStreamsKafkaExtractor.extract_send_partition(args, kwargs) - client_id = KStreamsKafkaExtractor.extract_producer_client_id(instance) + topic = KStreamsKafkaExtractor.extract_send_topic(args, kwargs) span_name = _get_span_name("send", topic) with tracer.start_as_current_span(span_name, kind=SpanKind.PRODUCER) as span: - _enrich_span(span, bootstrap_servers, topic, partition, client_id) + _enrich_base_span(span, bootstrap_servers, topic, client_id) propagate.inject( headers, context=trace.set_span_in_context(span), setter=_kstreams_setter, ) + record_metadata = await func(*args, **kwargs) + + partition = KStreamsKafkaExtractor.extract_send_partition(record_metadata) + offset = KStreamsKafkaExtractor.extract_send_offset(record_metadata) + _enrich_span_with_record_info(span, topic, partition, offset) - return await func(*args, **kwargs) + return record_metadata return _traced_send @@ -194,6 +222,7 @@ def _create_consumer_span( extracted_context: Context, bootstrap_servers: List[str], client_id: str, + consumer_group: Optional[str], args: Any, kwargs: Any, ) -> None: @@ -220,7 +249,20 @@ def _create_consumer_span( ) as span: new_context = trace.set_span_in_context(span, extracted_context) token = context.attach(new_context) - _enrich_span(span, bootstrap_servers, record.topic, record.partition, client_id, record.offset) + _enrich_base_span( + span, + bootstrap_servers, + record.topic, + client_id, + ) + _enrich_span_with_record_info( + span, record.topic, record.partition, record.offset + ) + # TODO: enable after 0.49 is released + # if consumer_group is not None: + # span.set_attribute( + # SpanAttributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group + # ) context.detach(token) @@ -236,6 +278,10 @@ async def _traced_anext(func, instance: Stream, args, kwargs): instance.backend ) client_id = KStreamsKafkaExtractor.extract_consumer_client_id(instance) + # consumer_group = KStreamsKafkaExtractor.extract_consumer_group( + # instance.consumer + # ) + consumer_group = None extracted_context: Context = propagate.extract( record.headers, getter=_kstreams_getter ) @@ -246,6 +292,7 @@ async def _traced_anext(func, instance: Stream, args, kwargs): extracted_context, bootstrap_servers, client_id, + consumer_group, args, kwargs, ) diff --git a/tests/test_utils.py b/tests/test_utils.py index c4450e8..c32af66 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -17,7 +17,6 @@ from unittest import TestCase, mock from opentelemetry_instrumentation_kstreams.utils import ( - KStreamsKafkaExtractor, _create_consumer_span, _get_span_name, _kstreams_getter, @@ -43,7 +42,7 @@ def setUp(self) -> None: @mock.patch( "opentelemetry_instrumentation_kstreams.utils.KStreamsKafkaExtractor.extract_send_partition" ) - @mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_span") + @mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_base_span") @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.propagate.inject") def test_wrap_send_with_topic_as_arg( @@ -68,7 +67,7 @@ def test_wrap_send_with_topic_as_arg( @mock.patch( "opentelemetry_instrumentation_kstreams.utils.KStreamsKafkaExtractor.extract_send_partition" ) - @mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_span") + @mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_base_span") @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.propagate.inject") def test_wrap_send_with_topic_as_kwarg( @@ -98,19 +97,22 @@ def wrap_send_helper( extract_bootstrap_servers: mock.MagicMock, ) -> None: tracer = mock.MagicMock() + record = mock.MagicMock() original_send_callback = mock.AsyncMock() + original_send_callback.return_value = record stream_engine = mock.MagicMock() stream_engine.backend = mock.MagicMock(spec_set=Kafka()) client_id = "client_id" stream_engine._producer.client._client_id = client_id expected_span_name = _get_span_name("send", self.topic_name) wrapped_send = _wrap_send(tracer) + retval = asyncio.run( wrapped_send(original_send_callback, stream_engine, self.args, self.kwargs) ) extract_bootstrap_servers.assert_called_once_with(stream_engine.backend) - extract_send_partition.assert_called_once_with(self.args, self.kwargs) + extract_send_partition.assert_called_once_with(record) tracer.start_as_current_span.assert_called_once_with( expected_span_name, kind=SpanKind.PRODUCER ) @@ -120,7 +122,6 @@ def wrap_send_helper( span, extract_bootstrap_servers.return_value, self.topic_name, - extract_send_partition.return_value, client_id, ) @@ -174,12 +175,16 @@ async def test_wrap_next( @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.context.attach") - @mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_span") + @mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_base_span") + @mock.patch( + "opentelemetry_instrumentation_kstreams.utils._enrich_span_with_record_info" + ) @mock.patch("opentelemetry.context.detach") def test_create_consumer_span( self, detach: mock.MagicMock, - enrich_span: mock.MagicMock, + enrich_span_with_record_info: mock.MagicMock, + enrich_base_span: mock.MagicMock, attach: mock.MagicMock, set_span_in_context: mock.MagicMock, ) -> None: @@ -196,6 +201,7 @@ def test_create_consumer_span( extracted_context, bootstrap_servers, client_id, + None, # TODO: wait for 0.49 to be released self.args, self.kwargs, ) @@ -211,22 +217,18 @@ def test_create_consumer_span( set_span_in_context.assert_called_once_with(span, extracted_context) attach.assert_called_once_with(set_span_in_context.return_value) - enrich_span.assert_called_once_with( + enrich_base_span.assert_called_once_with( span, bootstrap_servers, record.topic, - record.partition, client_id, + ) + + enrich_span_with_record_info.assert_called_once_with( + span, + record.topic, + record.partition, record.offset, ) # consume_hook.assert_called_once_with(span, record, self.args, self.kwargs) detach.assert_called_once_with(attach.return_value) - - # @mock.patch("opentelemetry_instrumentation_kstreams.utils.KStreamsKafkaExtractor") - def test_kafka_properties_extractor( - self, - # kafka_properties_extractor: mock.MagicMock, - ): - assert ( - KStreamsKafkaExtractor.extract_send_partition(self.args, self.kwargs) == 0 - )