Skip to content

Commit

Permalink
OPT: Instrument get_middlewares if available
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Oct 15, 2024
1 parent b7cb9c4 commit a76f7d5
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 77 deletions.
120 changes: 63 additions & 57 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 21 additions & 7 deletions src/opentelemetry_instrumentation_kstreams/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from .package import _instruments
from .version import __version__
from .wrappers import (
# _wrap_getone,
_wrap_build_stream_middleware_stack,
_wrap_get_middlewares,
_wrap_send,
)

Expand Down Expand Up @@ -38,12 +38,26 @@ def _instrument(self, **kwargs: Any):
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
wrap_function_wrapper(StreamEngine, "send", _wrap_send(tracer))
wrap_function_wrapper(
StreamEngine,
"build_stream_middleware_stack",
_wrap_build_stream_middleware_stack(tracer),
)

# kstreams >= 0.24.1
if hasattr(Stream, "get_middlewares"):
wrap_function_wrapper(
Stream,
"get_middlewares",
_wrap_get_middlewares(tracer),
)
else:
wrap_function_wrapper(
StreamEngine,
"_build_stream_middleware_stack",
_wrap_build_stream_middleware_stack(tracer),
)

def _uninstrument(self, **kwargs: Any):
unwrap(StreamEngine, "send")
unwrap(Stream, "build_stream_middleware_stack")

# kstreams >= 0.24.1
if hasattr(Stream, "get_middlewares"):
unwrap(Stream, "get_middlewares")
else:
unwrap(StreamEngine, "_build_stream_middleware_stack")
24 changes: 24 additions & 0 deletions src/opentelemetry_instrumentation_kstreams/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,27 @@ def _traced_build_stream_middleware_stack(
return next_call

return _traced_build_stream_middleware_stack


def _wrap_get_middlewares(
tracer: Tracer,
) -> Callable:
def _traced_get_middlewares(
func, instance: Stream, args, kwargs
) -> NextMiddlewareCall:
# let's check if otel is already present in the middlewares
if (
len(instance.middlewares) > 0
and instance.middlewares[0].middleware == OpenTelemetryMiddleware
):
return func(*args, **kwargs)

instance.middlewares.insert(
0, middleware.Middleware(OpenTelemetryMiddleware, tracer=tracer)
)

next_call = func(*args, **kwargs)

return next_call

return _traced_get_middlewares
4 changes: 2 additions & 2 deletions tests/test_instrumentation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kstreams import StreamEngine
from kstreams import Stream, StreamEngine
from wrapt import BoundFunctionWrapper

from opentelemetry_instrumentation_kstreams import KStreamsInstrumentor
Expand All @@ -8,4 +8,4 @@ def test_instrument_api() -> None:
instrumentation = KStreamsInstrumentor()
instrumentation.instrument()
assert isinstance(StreamEngine.send, BoundFunctionWrapper)
assert isinstance(StreamEngine.build_stream_middleware_stack, BoundFunctionWrapper)
assert isinstance(Stream.get_middlewares, BoundFunctionWrapper)
Loading

0 comments on commit a76f7d5

Please sign in to comment.