From 61844585b9e43529e2c9df7d80f00a5b4fc8e210 Mon Sep 17 00:00:00 2001 From: Santiago Fraire Willemoes Date: Tue, 1 Oct 2024 16:39:53 +0200 Subject: [PATCH] BREAK: Introduce otel middleware and require kstream >= 0.17 --- README.md | 3 + poetry.lock | 52 +++--- pyproject.toml | 2 +- .../instrumentor.py | 13 +- .../middlewares.py | 112 ++++++++++++ .../package.py | 2 +- .../utils.py | 168 ++++-------------- .../wrappers.py | 107 +++++++++++ tests/test_instrumentation.py | 7 +- tests/test_utils.py | 119 +++++-------- 10 files changed, 350 insertions(+), 235 deletions(-) create mode 100644 src/opentelemetry_instrumentation_kstreams/middlewares.py create mode 100644 src/opentelemetry_instrumentation_kstreams/wrappers.py diff --git a/README.md b/README.md index 9f4d355..09ab596 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,9 @@ Version: `0.3.0` +> [!IMPORTANT] +> This instrumentation works only with [ksterams middlewares](https://kpn.github.io/kstreams/middleware/) after `v0.17.0` + ## Installation ```sh diff --git a/poetry.lock b/poetry.lock index 9206e75..0a51e94 100644 --- a/poetry.lock +++ b/poetry.lock @@ -535,13 +535,13 @@ testing = ["pytest", "pytest-benchmark"] [[package]] name = "prometheus-client" -version = "0.20.0" +version = "0.21.0" description = "Python client for the Prometheus monitoring system." optional = true python-versions = ">=3.8" files = [ - {file = "prometheus_client-0.20.0-py3-none-any.whl", hash = "sha256:cde524a85bce83ca359cc837f28b8c0db5cac7aa653a588fd7e84ba061c329e7"}, - {file = "prometheus_client-0.20.0.tar.gz", hash = "sha256:287629d00b147a32dcb2be0b9df905da599b2d82f80377083ec8463309a4bb89"}, + {file = "prometheus_client-0.21.0-py3-none-any.whl", hash = "sha256:4fa6b4dd0ac16d58bb587c04b1caae65b8c5043e85f778f42f5f632f6af2e166"}, + {file = "prometheus_client-0.21.0.tar.gz", hash = "sha256:96c83c606b71ff2b0a433c98889d275f51ffec6c5e267de37c7a2b5c9aa9233e"}, ] [package.extras] @@ -549,13 +549,13 @@ twisted = ["twisted"] [[package]] name = "prompt-toolkit" -version = "3.0.47" +version = "3.0.48" description = "Library for building powerful interactive command lines in Python" optional = false python-versions = ">=3.7.0" files = [ - {file = "prompt_toolkit-3.0.47-py3-none-any.whl", hash = "sha256:0d7bfa67001d5e39d02c224b663abc33687405033a8c422d0d675a5a13361d10"}, - {file = "prompt_toolkit-3.0.47.tar.gz", hash = "sha256:1e1b29cb58080b1e69f207c893a1a7bf16d127a5c30c9d17a25a5d77792e5360"}, + {file = "prompt_toolkit-3.0.48-py3-none-any.whl", hash = "sha256:f49a827f90062e411f1ce1f854f2aedb3c23353244f8108b89283587397ac10e"}, + {file = "prompt_toolkit-3.0.48.tar.gz", hash = "sha256:d6623ab0477a80df74e646bdbc93621143f5caf104206aa29294d53de1a03d90"}, ] [package.dependencies] @@ -810,29 +810,29 @@ files = [ [[package]] name = "ruff" -version = "0.6.5" +version = "0.6.8" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.6.5-py3-none-linux_armv6l.whl", hash = "sha256:7e4e308f16e07c95fc7753fc1aaac690a323b2bb9f4ec5e844a97bb7fbebd748"}, - {file = "ruff-0.6.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:932cd69eefe4daf8c7d92bd6689f7e8182571cb934ea720af218929da7bd7d69"}, - {file = "ruff-0.6.5-py3-none-macosx_11_0_arm64.whl", hash = "sha256:3a8d42d11fff8d3143ff4da41742a98f8f233bf8890e9fe23077826818f8d680"}, - {file = "ruff-0.6.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a50af6e828ee692fb10ff2dfe53f05caecf077f4210fae9677e06a808275754f"}, - {file = "ruff-0.6.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:794ada3400a0d0b89e3015f1a7e01f4c97320ac665b7bc3ade24b50b54cb2972"}, - {file = "ruff-0.6.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:381413ec47f71ce1d1c614f7779d88886f406f1fd53d289c77e4e533dc6ea200"}, - {file = "ruff-0.6.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:52e75a82bbc9b42e63c08d22ad0ac525117e72aee9729a069d7c4f235fc4d276"}, - {file = "ruff-0.6.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:09c72a833fd3551135ceddcba5ebdb68ff89225d30758027280968c9acdc7810"}, - {file = "ruff-0.6.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:800c50371bdcb99b3c1551d5691e14d16d6f07063a518770254227f7f6e8c178"}, - {file = "ruff-0.6.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8e25ddd9cd63ba1f3bd51c1f09903904a6adf8429df34f17d728a8fa11174253"}, - {file = "ruff-0.6.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:7291e64d7129f24d1b0c947ec3ec4c0076e958d1475c61202497c6aced35dd19"}, - {file = "ruff-0.6.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:9ad7dfbd138d09d9a7e6931e6a7e797651ce29becd688be8a0d4d5f8177b4b0c"}, - {file = "ruff-0.6.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:005256d977021790cc52aa23d78f06bb5090dc0bfbd42de46d49c201533982ae"}, - {file = "ruff-0.6.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:482c1e6bfeb615eafc5899127b805d28e387bd87db38b2c0c41d271f5e58d8cc"}, - {file = "ruff-0.6.5-py3-none-win32.whl", hash = "sha256:cf4d3fa53644137f6a4a27a2b397381d16454a1566ae5335855c187fbf67e4f5"}, - {file = "ruff-0.6.5-py3-none-win_amd64.whl", hash = "sha256:3e42a57b58e3612051a636bc1ac4e6b838679530235520e8f095f7c44f706ff9"}, - {file = "ruff-0.6.5-py3-none-win_arm64.whl", hash = "sha256:51935067740773afdf97493ba9b8231279e9beef0f2a8079188c4776c25688e0"}, - {file = "ruff-0.6.5.tar.gz", hash = "sha256:4d32d87fab433c0cf285c3683dd4dae63be05fd7a1d65b3f5bf7cdd05a6b96fb"}, + {file = "ruff-0.6.8-py3-none-linux_armv6l.whl", hash = "sha256:77944bca110ff0a43b768f05a529fecd0706aac7bcce36d7f1eeb4cbfca5f0f2"}, + {file = "ruff-0.6.8-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:27b87e1801e786cd6ede4ada3faa5e254ce774de835e6723fd94551464c56b8c"}, + {file = "ruff-0.6.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:cd48f945da2a6334f1793d7f701725a76ba93bf3d73c36f6b21fb04d5338dcf5"}, + {file = "ruff-0.6.8-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:677e03c00f37c66cea033274295a983c7c546edea5043d0c798833adf4cf4c6f"}, + {file = "ruff-0.6.8-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9f1476236b3eacfacfc0f66aa9e6cd39f2a624cb73ea99189556015f27c0bdeb"}, + {file = "ruff-0.6.8-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6f5a2f17c7d32991169195d52a04c95b256378bbf0de8cb98478351eb70d526f"}, + {file = "ruff-0.6.8-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:5fd0d4b7b1457c49e435ee1e437900ced9b35cb8dc5178921dfb7d98d65a08d0"}, + {file = "ruff-0.6.8-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f8034b19b993e9601f2ddf2c517451e17a6ab5cdb1c13fdff50c1442a7171d87"}, + {file = "ruff-0.6.8-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6cfb227b932ba8ef6e56c9f875d987973cd5e35bc5d05f5abf045af78ad8e098"}, + {file = "ruff-0.6.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ef0411eccfc3909269fed47c61ffebdcb84a04504bafa6b6df9b85c27e813b0"}, + {file = "ruff-0.6.8-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:007dee844738c3d2e6c24ab5bc7d43c99ba3e1943bd2d95d598582e9c1b27750"}, + {file = "ruff-0.6.8-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ce60058d3cdd8490e5e5471ef086b3f1e90ab872b548814e35930e21d848c9ce"}, + {file = "ruff-0.6.8-py3-none-musllinux_1_2_i686.whl", hash = "sha256:1085c455d1b3fdb8021ad534379c60353b81ba079712bce7a900e834859182fa"}, + {file = "ruff-0.6.8-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:70edf6a93b19481affd287d696d9e311388d808671bc209fb8907b46a8c3af44"}, + {file = "ruff-0.6.8-py3-none-win32.whl", hash = "sha256:792213f7be25316f9b46b854df80a77e0da87ec66691e8f012f887b4a671ab5a"}, + {file = "ruff-0.6.8-py3-none-win_amd64.whl", hash = "sha256:ec0517dc0f37cad14a5319ba7bba6e7e339d03fbf967a6d69b0907d61be7a263"}, + {file = "ruff-0.6.8-py3-none-win_arm64.whl", hash = "sha256:8d3bb2e3fbb9875172119021a13eed38849e762499e3cfde9588e4b4d70968dc"}, + {file = "ruff-0.6.8.tar.gz", hash = "sha256:a5bf44b1aa0adaf6d9d20f86162b34f7c593bfedabc51239953e446aefc8ce18"}, ] [[package]] @@ -1037,4 +1037,4 @@ instruments = ["kstreams"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "1e7c1f13e68565eecddfa93662146806d31ced3e3792c10783a5f27996764f6d" +content-hash = "a4f797a685195bd8ca29fa17f9034009b65ba5a2ddf8c5b32974bf57a6ecde27" diff --git a/pyproject.toml b/pyproject.toml index 33d2436..4f8dda7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ python = "^3.8" opentelemetry-api = "^1.27.0" opentelemetry-instrumentation = "^0.48b0" opentelemetry-semantic-conventions = "^0.48b0" -kstreams = { version = ">=0.12.0", optional = true } +kstreams = { version = ">=0.17.0", optional = true } [tool.poetry.group.dev.dependencies] ruff = "^0.6" diff --git a/src/opentelemetry_instrumentation_kstreams/instrumentor.py b/src/opentelemetry_instrumentation_kstreams/instrumentor.py index 82b808d..654d11e 100644 --- a/src/opentelemetry_instrumentation_kstreams/instrumentor.py +++ b/src/opentelemetry_instrumentation_kstreams/instrumentor.py @@ -6,9 +6,10 @@ from opentelemetry.instrumentation.utils import unwrap from wrapt import wrap_function_wrapper from .version import __version__ -from .utils import ( +from .wrappers import ( _wrap_send, - _wrap_getone, + # _wrap_getone, + _wrap_build_stream_middleware_stack, ) from .package import _instruments @@ -37,8 +38,12 @@ def _instrument(self, **kwargs: Any): schema_url="https://opentelemetry.io/schemas/1.11.0", ) wrap_function_wrapper(StreamEngine, "send", _wrap_send(tracer)) - wrap_function_wrapper(Stream, "getone", _wrap_getone(tracer)) + wrap_function_wrapper( + StreamEngine, + "build_stream_middleware_stack", + _wrap_build_stream_middleware_stack(tracer), + ) def _uninstrument(self, **kwargs: Any): unwrap(StreamEngine, "send") - unwrap(Stream, "getone") + unwrap(Stream, "build_stream_middleware_stack") diff --git a/src/opentelemetry_instrumentation_kstreams/middlewares.py b/src/opentelemetry_instrumentation_kstreams/middlewares.py new file mode 100644 index 0000000..40b93ea --- /dev/null +++ b/src/opentelemetry_instrumentation_kstreams/middlewares.py @@ -0,0 +1,112 @@ +from typing import Any, Optional + +from kstreams import ( + ConsumerRecord, + middleware, +) +from kstreams.backends.kafka import Kafka +from opentelemetry import context, propagate, trace +from opentelemetry.context.context import Context + +# Enable after 0.49 is released +# from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes +from opentelemetry.trace import SpanKind, Tracer + +from .utils import ( + KStreamsKafkaExtractor, + _enrich_base_span, + _enrich_span_with_record_info, + _get_span_name, + _kstreams_getter, +) + + +class OpenTelemetryMiddleware(middleware.BaseMiddleware): + """ + Middleware for integrating OpenTelemetry tracing with Kafka Streams. + + This middleware extracts tracing information from Kafka consumer records and + creates spans for tracing the processing of these records. + + Attributes: + tracer: The OpenTelemetry tracer instance used for creating spans. + + Methods: + __call__(cr: ConsumerRecord) -> Any: + Asynchronously processes a Kafka consumer record, creating and enriching + an OpenTelemetry span with tracing information. + """ + + def __init__(self, *, tracer: Optional[Tracer] = None, **kwargs) -> None: + super().__init__(**kwargs) + if tracer is None: + tracer = trace.get_tracer(__name__) + + # The current tracer instance + self.tracer = tracer + + # Initialize variables computed once which are injected into the span + if not isinstance(self.stream.backend, Kafka): + raise NotImplementedError("Only Kafka backend is supported for now") + self.bootstrap_servers = KStreamsKafkaExtractor.extract_bootstrap_servers( + self.stream.backend + ) + self.consumer_group = KStreamsKafkaExtractor.extract_consumer_group( + self.stream.consumer + ) + self.client_id = KStreamsKafkaExtractor.extract_consumer_client_id(self.stream) + + async def __call__(self, cr: ConsumerRecord) -> Any: + """ + Asynchronously processes a ConsumerRecord by creating and managing a span. + + Args: + cr (ConsumerRecord): The consumer record to be processed. + + Returns: + Any: The result of the next call in the processing chain. + + This method performs the following steps: + 1. Extracts the context from the record headers. + 2. Starts a new span with the extracted context. + 3. Enriches the span with base and record-specific information. + 4. Optionally sets the consumer group attribute (currently commented out). + 5. Calls the next processing function in the chain. + 6. Detaches the context token. + """ + tracer = self.tracer + record = cr + bootstrap_servers = self.bootstrap_servers + client_id = self.client_id + span_name = _get_span_name("receive", record.topic) + extracted_context: Context = propagate.extract( + record.headers, getter=_kstreams_getter + ) + + with tracer.start_as_current_span( + span_name, + context=extracted_context, + end_on_exit=True, + kind=SpanKind.CONSUMER, + ) as span: + new_context = trace.set_span_in_context(span, extracted_context) + context_token = context.attach(new_context) + + _enrich_base_span( + span, + bootstrap_servers, + record.topic, + client_id, + ) + _enrich_span_with_record_info( + span, record.topic, record.partition, record.offset + ) + + # TODO: enable after 0.49 is released + # if self.consumer_group is not None: + # span.set_attribute( + # SpanAttributes.MESSAGING_CONSUMER_GROUP_NAME, self.consumer_group + # ) + + await self.next_call(cr) + context.detach(context_token) diff --git a/src/opentelemetry_instrumentation_kstreams/package.py b/src/opentelemetry_instrumentation_kstreams/package.py index 5a7aa0a..db7c6c8 100644 --- a/src/opentelemetry_instrumentation_kstreams/package.py +++ b/src/opentelemetry_instrumentation_kstreams/package.py @@ -1 +1 @@ -_instruments = ("kstreams >= 0.13.0",) +_instruments = ("kstreams >= 0.17.0",) diff --git a/src/opentelemetry_instrumentation_kstreams/utils.py b/src/opentelemetry_instrumentation_kstreams/utils.py index 7ee57c9..7ff458f 100644 --- a/src/opentelemetry_instrumentation_kstreams/utils.py +++ b/src/opentelemetry_instrumentation_kstreams/utils.py @@ -1,18 +1,19 @@ import json from logging import getLogger -from typing import Any, Awaitable, Callable, Iterable, List, Optional, Union +from typing import Any, Iterable, List, Optional, Union -from kstreams import Send, Stream, StreamEngine, ConsumerRecord, RecordMetadata +from kstreams import ( + Stream, + StreamEngine, +) from kstreams.backends.kafka import Kafka -from opentelemetry import propagate, context, trace + from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import SpanAttributes # Enable after 0.49 is released # from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes -from opentelemetry.trace import SpanKind, Tracer from opentelemetry.trace.span import Span -from opentelemetry.context.context import Context _LOG = getLogger(__name__) @@ -21,6 +22,21 @@ class KStreamsContextGetter(textmap.Getter[HeadersT]): + """ + KStreamsContextGetter is a custom implementation of the textmap.Getter interface + for extracting context from Kafka Streams headers. + + Methods: + get(carrier: HeadersT, key: str) -> Optional[List[str]]: + Extracts the value associated with the given key from the carrier. + If the carrier is None or the key is not found, returns None. + If the value is found and is not None, it decodes the value and returns it as a list of strings. + + keys(carrier: HeadersT) -> List[str]: + Returns a list of all keys present in the carrier. + If the carrier is None, returns an empty list. + """ + def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]: if carrier is None: return None @@ -45,6 +61,23 @@ def keys(self, carrier: HeadersT) -> List[str]: class KStreamsContextSetter(textmap.Setter[HeadersT]): + """ + A context setter for KStreams that implements the textmap.Setter interface. + + Methods: + set(carrier: HeadersT, key: str, value: Optional[str]) -> None + Sets a key-value pair in the carrier. If the carrier is a list, the key-value pair is appended. + If the carrier is a dictionary, the key-value pair is added or updated. + + Parameters: + carrier : HeadersT + The carrier to set the key-value pair in. Can be a list or a dictionary. + key : str + The key to set in the carrier. + value : Optional[str] + The value to set for the key in the carrier. If None, the key-value pair is not set. + """ + def set(self, carrier: HeadersT, key: str, value: Optional[str]) -> None: if carrier is None or key is None: return @@ -175,128 +208,3 @@ def _enrich_span_with_record_info( def _get_span_name(operation: str, topic: str) -> str: return f"{topic} {operation}" - - -def _wrap_send(tracer: Tracer) -> Callable: - async def _traced_send( - func: Send, instance: StreamEngine, args, kwargs - ) -> Awaitable[RecordMetadata]: - if not isinstance(instance.backend, Kafka): - raise NotImplementedError("Only Kafka backend is supported for now") - - headers = KStreamsKafkaExtractor.extract_send_headers(args, kwargs) - - if headers is None: - headers = [] - kwargs["headers"] = headers - client_id = KStreamsKafkaExtractor.extract_producer_client_id(instance) - bootstrap_servers = KStreamsKafkaExtractor.extract_bootstrap_servers( - instance.backend - ) - topic = KStreamsKafkaExtractor.extract_send_topic(args, kwargs) - - span_name = _get_span_name("send", topic) - with tracer.start_as_current_span(span_name, kind=SpanKind.PRODUCER) as span: - _enrich_base_span(span, bootstrap_servers, topic, client_id) - propagate.inject( - headers, - context=trace.set_span_in_context(span), - setter=_kstreams_setter, - ) - record_metadata = await func(*args, **kwargs) - - partition = KStreamsKafkaExtractor.extract_send_partition(record_metadata) - offset = KStreamsKafkaExtractor.extract_send_offset(record_metadata) - _enrich_span_with_record_info(span, topic, partition, offset) - - return record_metadata - - return _traced_send - - -def _create_consumer_span( - tracer: Tracer, - record: ConsumerRecord, - extracted_context: Context, - bootstrap_servers: List[str], - client_id: str, - consumer_group: Optional[str], - args: Any, - kwargs: Any, -) -> None: - """ - Creates and starts a consumer span for a given Kafka record. - - Args: - tracer: The tracer instance used to create the span. - record: The Kafka consumer record for which the span is created. - extracted_context: The context extracted from the incoming message. - bootstrap_servers: List of bootstrap servers for the Kafka cluster. - client_id: The client ID of the Kafka consumer. - args: Additional positional arguments. - kwargs: Additional keyword arguments. - - Returns: - None - """ - span_name = _get_span_name("receive", record.topic) - with tracer.start_as_current_span( - span_name, - context=extracted_context, - kind=SpanKind.CONSUMER, - ) as span: - new_context = trace.set_span_in_context(span, extracted_context) - token = context.attach(new_context) - _enrich_base_span( - span, - bootstrap_servers, - record.topic, - client_id, - ) - _enrich_span_with_record_info( - span, record.topic, record.partition, record.offset - ) - # TODO: enable after 0.49 is released - # if consumer_group is not None: - # span.set_attribute( - # SpanAttributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group - # ) - # trace.set_span_in_context(span) - context.detach(token) - - -def _wrap_getone( - tracer: Tracer, -) -> Callable: - async def _traced_anext(func, instance: Stream, args, kwargs): - if not isinstance(instance.backend, Kafka): - raise NotImplementedError("Only Kafka backend is supported for now") - record: ConsumerRecord = await func(*args, **kwargs) - bootstrap_servers = KStreamsKafkaExtractor.extract_bootstrap_servers( - instance.backend - ) - client_id = KStreamsKafkaExtractor.extract_consumer_client_id(instance) - # consumer_group = KStreamsKafkaExtractor.extract_consumer_group( - # instance.consumer - # ) - consumer_group = None - extracted_context: Context = propagate.extract( - record.headers, getter=_kstreams_getter - ) - - _create_consumer_span( - tracer, - record, - extracted_context, - bootstrap_servers, - client_id, - consumer_group, - args, - kwargs, - ) - # instance._current_context_token = context.attach( - # ) - - return record - - return _traced_anext diff --git a/src/opentelemetry_instrumentation_kstreams/wrappers.py b/src/opentelemetry_instrumentation_kstreams/wrappers.py new file mode 100644 index 0000000..cee6fc0 --- /dev/null +++ b/src/opentelemetry_instrumentation_kstreams/wrappers.py @@ -0,0 +1,107 @@ +from typing import Awaitable, Callable + +from kstreams import ( + RecordMetadata, + Send, + Stream, + StreamEngine, + middleware, +) +from kstreams.backends.kafka import Kafka +from kstreams.types import NextMiddlewareCall +from opentelemetry import propagate, trace + +# Enable after 0.49 is released +# from opentelemetry.semconv._incubating.attributes import messaging_attributes as SpanAttributes +from opentelemetry.trace import SpanKind, Tracer + +from .middlewares import OpenTelemetryMiddleware +from .utils import ( + KStreamsKafkaExtractor, + _enrich_base_span, + _enrich_span_with_record_info, + _get_span_name, + _kstreams_setter, +) + + +def _wrap_send(tracer: Tracer) -> Callable: + """ + Wraps the send function of a Kafka producer with tracing capabilities. + + Args: + tracer: The OpenTelemetry tracer used to create spans. + + Returns: + Callable: An asynchronous function that wraps the original send function, + adding tracing information to the Kafka message headers and + enriching the span with metadata about the message. + + Raises: + NotImplementedError: If the backend of the instance is not Kafka. + + The wrapped function performs the following steps: + 1. Checks if the backend of the instance is Kafka. + 2. Extracts or initializes the headers for the Kafka message. + 3. Extracts the client ID, bootstrap servers, and topic from the instance. + 4. Creates a span with the name "send" and the topic. + 5. Enriches the span with base information such as bootstrap servers, topic, and client ID. + 6. Injects the tracing context into the message headers. + 7. Calls the original send function and awaits its result. + 8. Extracts partition and offset information from the result. + 9. Enriches the span with record metadata such as partition and offset. + """ + + async def _traced_send( + func: Send, instance: StreamEngine, args, kwargs + ) -> Awaitable[RecordMetadata]: + if not isinstance(instance.backend, Kafka): + raise NotImplementedError("Only Kafka backend is supported for now") + + headers = KStreamsKafkaExtractor.extract_send_headers(args, kwargs) + + if headers is None: + headers = [] + kwargs["headers"] = headers + client_id = KStreamsKafkaExtractor.extract_producer_client_id(instance) + bootstrap_servers = KStreamsKafkaExtractor.extract_bootstrap_servers( + instance.backend + ) + topic = KStreamsKafkaExtractor.extract_send_topic(args, kwargs) + + span_name = _get_span_name("send", topic) + with tracer.start_as_current_span(span_name, kind=SpanKind.PRODUCER) as span: + _enrich_base_span(span, bootstrap_servers, topic, client_id) + propagate.inject( + headers, + context=trace.set_span_in_context(span), + setter=_kstreams_setter, + ) + record_metadata = await func(*args, **kwargs) + + partition = KStreamsKafkaExtractor.extract_send_partition(record_metadata) + offset = KStreamsKafkaExtractor.extract_send_offset(record_metadata) + _enrich_span_with_record_info(span, topic, partition, offset) + + return record_metadata + + return _traced_send + + +def _wrap_build_stream_middleware_stack( + tracer: Tracer, +) -> Callable: + def _traced_build_stream_middleware_stack( + func, instance: StreamEngine, args, kwargs + ) -> NextMiddlewareCall: + stream: Stream = kwargs.get("stream") + if stream is None: + raise ValueError("stream should already be present") + stream.middlewares.insert( + 0, middleware.Middleware(OpenTelemetryMiddleware, tracer=tracer) + ) + next_call = func(*args, **kwargs) + + return next_call + + return _traced_build_stream_middleware_stack diff --git a/tests/test_instrumentation.py b/tests/test_instrumentation.py index 4bcb6bd..93e2ba5 100644 --- a/tests/test_instrumentation.py +++ b/tests/test_instrumentation.py @@ -1,10 +1,11 @@ -from kstreams import Stream, StreamEngine -from opentelemetry_instrumentation_kstreams import KStreamsInstrumentor +from kstreams import StreamEngine from wrapt import BoundFunctionWrapper +from opentelemetry_instrumentation_kstreams import KStreamsInstrumentor + def test_instrument_api() -> None: instrumentation = KStreamsInstrumentor() instrumentation.instrument() assert isinstance(StreamEngine.send, BoundFunctionWrapper) - assert isinstance(Stream.getone, BoundFunctionWrapper) + assert isinstance(StreamEngine.build_stream_middleware_stack, BoundFunctionWrapper) diff --git a/tests/test_utils.py b/tests/test_utils.py index 082edc8..a0ed7af 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -16,16 +16,19 @@ import asyncio from unittest import TestCase, mock +from kstreams.backends.kafka import Kafka +from opentelemetry.trace import SpanKind + +from opentelemetry_instrumentation_kstreams.middlewares import OpenTelemetryMiddleware from opentelemetry_instrumentation_kstreams.utils import ( - _create_consumer_span, _get_span_name, _kstreams_getter, _kstreams_setter, - _wrap_getone, +) +from opentelemetry_instrumentation_kstreams.wrappers import ( + _wrap_build_stream_middleware_stack, _wrap_send, ) -from opentelemetry.trace import SpanKind -from kstreams.backends.kafka import Kafka class TestUtils(TestCase): @@ -135,44 +138,6 @@ def wrap_send_helper( self.assertEqual(retval, original_send_callback.return_value) @mock.patch("opentelemetry.propagate.extract") - @mock.patch("opentelemetry_instrumentation_kstreams.utils._create_consumer_span") - @mock.patch( - "opentelemetry_instrumentation_kstreams.utils.KStreamsKafkaExtractor.extract_bootstrap_servers" - ) - async def test_wrap_next( - self, - extract_bootstrap_servers: mock.MagicMock, - _create_consumer_span: mock.MagicMock, - extract: mock.MagicMock, - ) -> None: - tracer = mock.MagicMock() - original_next_callback = mock.MagicMock() - stream = mock.MagicMock() - stream.backend = mock.MagicMock(spec_set=Kafka()) - - wrapped_next = _wrap_getone(tracer) - record = await wrapped_next( - original_next_callback, stream, self.args, self.kwargs - ) - - extract_bootstrap_servers.assert_called_once_with(stream.backend) - bootstrap_servers = extract_bootstrap_servers.return_value - - original_next_callback.assert_called_once_with(*self.args, **self.kwargs) - self.assertEqual(record, original_next_callback.return_value) - - extract.assert_called_once_with(record.headers, getter=_kstreams_getter) - context = extract.return_value - - _create_consumer_span.assert_called_once_with( - tracer, - record, - context, - bootstrap_servers, - self.args, - self.kwargs, - ) - @mock.patch("opentelemetry.trace.set_span_in_context") @mock.patch("opentelemetry.context.attach") @mock.patch("opentelemetry_instrumentation_kstreams.utils._enrich_base_span") @@ -180,55 +145,69 @@ async def test_wrap_next( "opentelemetry_instrumentation_kstreams.utils._enrich_span_with_record_info" ) @mock.patch("opentelemetry.context.detach") - def test_create_consumer_span( + def test_opentelemetry_middleware( self, detach: mock.MagicMock, enrich_span_with_record_info: mock.MagicMock, enrich_base_span: mock.MagicMock, attach: mock.MagicMock, set_span_in_context: mock.MagicMock, + extract: mock.MagicMock, ) -> None: + async def func(cr): ... + tracer = mock.MagicMock() - # consume_hook = mock.MagicMock() - bootstrap_servers = mock.MagicMock() - extracted_context = mock.MagicMock() - record = mock.MagicMock() - client_id = mock.MagicMock() - _create_consumer_span( - tracer, - record, - extracted_context, - bootstrap_servers, - client_id, - None, # TODO: wait for 0.49 to be released - self.args, - self.kwargs, + next_call = func + send = mock.MagicMock() + stream = mock.MagicMock() + stream.backend = Kafka() + stream.consumer._client._client_id = "client_id" + stream.consumer.group_id = "consumer_group" + + record = mock.MagicMock() + middleware = OpenTelemetryMiddleware( + next_call=next_call, send=send, stream=stream, tracer=tracer ) - expected_span_name = _get_span_name("receive", record.topic) + assert middleware.bootstrap_servers == stream.backend.bootstrap_servers + assert middleware.client_id == "client_id" + assert middleware.consumer_group == "consumer_group" - tracer.start_as_current_span.assert_called_once_with( - expected_span_name, - context=extracted_context, - kind=SpanKind.CONSUMER, - ) - span = tracer.start_as_current_span.return_value.__enter__() - set_span_in_context.assert_called_once_with(span, extracted_context) + asyncio.run(middleware(record)) + + extract.assert_called_once_with(record.headers, getter=_kstreams_getter) + tracer.start_as_current_span.assert_called_once() attach.assert_called_once_with(set_span_in_context.return_value) + detach.assert_called_once_with(attach.return_value) + span = tracer.start_as_current_span.return_value.__enter__() enrich_base_span.assert_called_once_with( span, - bootstrap_servers, + stream.backend.bootstrap_servers, record.topic, - client_id, + "client_id", ) - enrich_span_with_record_info.assert_called_once_with( span, record.topic, record.partition, record.offset, ) - # consume_hook.assert_called_once_with(span, record, self.args, self.kwargs) - detach.assert_called_once_with(attach.return_value) + + def test_build_stream_middleware_stack_receives_stream(self): + tracer = mock.MagicMock() + middlewares = [] + + func = mock.MagicMock() + instance = mock.MagicMock() + args = mock.MagicMock() + stream = mock.MagicMock() + stream.middlewares = middlewares + kwargs = {"stream": stream} + + _wrap_build_stream_middleware_stack(tracer)(func, instance, args, kwargs) + + first_middleware_class = stream.middlewares[0].middleware + + assert first_middleware_class == OpenTelemetryMiddleware