diff --git a/benches/bench_import.py b/benches/bench_import.py index aabf8fc7b1b5..0398ec523ce0 100644 --- a/benches/bench_import.py +++ b/benches/bench_import.py @@ -2,19 +2,28 @@ import sys import pytest +from prometheus_client import REGISTRY +from pytest_benchmark.fixture import BenchmarkFixture + + +def reset_imports(): + # Remove the module from sys.modules if it's there + prefect_modules = [key for key in sys.modules if key.startswith("prefect")] + for module in prefect_modules: + del sys.modules[module] + + # Clear importlib cache + importlib.invalidate_caches() + + # reset the prometheus registry to clear any previously measured metrics + for collector in list(REGISTRY._collector_to_names): + REGISTRY.unregister(collector) @pytest.mark.benchmark(group="imports") -def bench_import_prefect(benchmark): +def bench_import_prefect(benchmark: BenchmarkFixture): def import_prefect(): - # To get an accurate result, we want to import the module from scratch each time - # Remove the module from sys.modules if it's there - prefect_modules = [key for key in sys.modules if key.startswith("prefect")] - for module in prefect_modules: - del sys.modules[module] - - # Clear importlib cache - importlib.invalidate_caches() + reset_imports() import prefect # noqa @@ -23,16 +32,9 @@ def import_prefect(): @pytest.mark.timeout(180) @pytest.mark.benchmark(group="imports") -def bench_import_prefect_flow(benchmark): +def bench_import_prefect_flow(benchmark: BenchmarkFixture): def import_prefect_flow(): - # To get an accurate result, we want to import the module from scratch each time - # Remove the module from sys.modules if it's there - prefect_modules = [key for key in sys.modules if key.startswith("prefect")] - for module in prefect_modules: - del sys.modules[module] - - # Clear importlib cache - importlib.invalidate_caches() + reset_imports() from prefect import flow # noqa diff --git a/src/prefect/events/clients.py b/src/prefect/events/clients.py index 70a9bf38861e..a67dac3ef0d7 100644 --- a/src/prefect/events/clients.py +++ b/src/prefect/events/clients.py @@ -19,6 +19,7 @@ import orjson import pendulum from cachetools import TTLCache +from prometheus_client import Counter from typing_extensions import Self from websockets import Subprotocol from websockets.client import WebSocketClientProtocol, connect @@ -36,6 +37,30 @@ if TYPE_CHECKING: from prefect.events.filters import EventFilter +EVENTS_EMITTED = Counter( + "prefect_events_emitted", + "The number of events emitted by Prefect event clients", + labelnames=["client"], +) +EVENTS_OBSERVED = Counter( + "prefect_events_observed", + "The number of events observed by Prefect event subscribers", + labelnames=["client"], +) +EVENT_WEBSOCKET_CONNECTIONS = Counter( + "prefect_event_websocket_connections", + ( + "The number of times Prefect event clients have connected to an event stream, " + "broken down by direction (in/out) and connection (initial/reconnect)" + ), + labelnames=["client", "direction", "connection"], +) +EVENT_WEBSOCKET_CHECKPOINTS = Counter( + "prefect_event_websocket_checkpoints", + "The number of checkpoints performed by Prefect event clients", + labelnames=["client"], +) + logger = get_logger(__name__) @@ -82,6 +107,10 @@ def get_events_subscriber( class EventsClient(abc.ABC): """The abstract interface for all Prefect Events clients""" + @property + def client_name(self) -> str: + return self.__class__.__name__ + async def emit(self, event: Event) -> None: """Emit a single event""" if not hasattr(self, "_in_context"): @@ -89,7 +118,11 @@ async def emit(self, event: Event) -> None: "Events may only be emitted while this client is being used as a " "context manager" ) - return await self._emit(event) + + try: + return await self._emit(event) + finally: + EVENTS_EMITTED.labels(self.client_name).inc() @abc.abstractmethod async def _emit(self, event: Event) -> None: # pragma: no cover @@ -299,6 +332,8 @@ async def _checkpoint(self, event: Event) -> None: # don't clear the list, just the ones that we are sure of. self._unconfirmed_events = self._unconfirmed_events[unconfirmed_count:] + EVENT_WEBSOCKET_CHECKPOINTS.labels(self.client_name).inc() + async def _emit(self, event: Event) -> None: for i in range(self._reconnection_attempts + 1): try: @@ -426,10 +461,17 @@ def __init__( if self._reconnection_attempts < 0: raise ValueError("reconnection_attempts must be a non-negative integer") + @property + def client_name(self) -> str: + return self.__class__.__name__ + async def __aenter__(self) -> Self: # Don't handle any errors in the initial connection, because these are most # likely a permission or configuration issue that should propagate - await self._reconnect() + try: + await self._reconnect() + finally: + EVENT_WEBSOCKET_CONNECTIONS.labels(self.client_name, "out", "initial") return self async def _reconnect(self) -> None: @@ -503,7 +545,12 @@ async def __anext__(self) -> Event: # Otherwise, after the first time through this loop, we're recovering # from a ConnectionClosed, so reconnect now. if not self._websocket or i > 0: - await self._reconnect() + try: + await self._reconnect() + finally: + EVENT_WEBSOCKET_CONNECTIONS.labels( + self.client_name, "out", "reconnect" + ) assert self._websocket while True: @@ -514,7 +561,10 @@ async def __anext__(self) -> Event: continue self._seen_events[event.id] = True - return event + try: + return event + finally: + EVENTS_OBSERVED.labels(self.client_name).inc() except ConnectionClosedOK: logger.debug('Connection closed with "OK" status') raise StopAsyncIteration