Skip to content

Commit

Permalink
Adds metrics for client-side event emission and subscription (#14785)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisguidry authored Jul 29, 2024
1 parent fede95b commit 4a58380
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 22 deletions.
38 changes: 20 additions & 18 deletions benches/bench_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,28 @@
import sys

import pytest
from prometheus_client import REGISTRY
from pytest_benchmark.fixture import BenchmarkFixture


def reset_imports():
# Remove the module from sys.modules if it's there
prefect_modules = [key for key in sys.modules if key.startswith("prefect")]
for module in prefect_modules:
del sys.modules[module]

# Clear importlib cache
importlib.invalidate_caches()

# reset the prometheus registry to clear any previously measured metrics
for collector in list(REGISTRY._collector_to_names):
REGISTRY.unregister(collector)


@pytest.mark.benchmark(group="imports")
def bench_import_prefect(benchmark):
def bench_import_prefect(benchmark: BenchmarkFixture):
def import_prefect():
# To get an accurate result, we want to import the module from scratch each time
# Remove the module from sys.modules if it's there
prefect_modules = [key for key in sys.modules if key.startswith("prefect")]
for module in prefect_modules:
del sys.modules[module]

# Clear importlib cache
importlib.invalidate_caches()
reset_imports()

import prefect # noqa

Expand All @@ -23,16 +32,9 @@ def import_prefect():

@pytest.mark.timeout(180)
@pytest.mark.benchmark(group="imports")
def bench_import_prefect_flow(benchmark):
def bench_import_prefect_flow(benchmark: BenchmarkFixture):
def import_prefect_flow():
# To get an accurate result, we want to import the module from scratch each time
# Remove the module from sys.modules if it's there
prefect_modules = [key for key in sys.modules if key.startswith("prefect")]
for module in prefect_modules:
del sys.modules[module]

# Clear importlib cache
importlib.invalidate_caches()
reset_imports()

from prefect import flow # noqa

Expand Down
58 changes: 54 additions & 4 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import orjson
import pendulum
from cachetools import TTLCache
from prometheus_client import Counter
from typing_extensions import Self
from websockets import Subprotocol
from websockets.client import WebSocketClientProtocol, connect
Expand All @@ -36,6 +37,30 @@
if TYPE_CHECKING:
from prefect.events.filters import EventFilter

EVENTS_EMITTED = Counter(
"prefect_events_emitted",
"The number of events emitted by Prefect event clients",
labelnames=["client"],
)
EVENTS_OBSERVED = Counter(
"prefect_events_observed",
"The number of events observed by Prefect event subscribers",
labelnames=["client"],
)
EVENT_WEBSOCKET_CONNECTIONS = Counter(
"prefect_event_websocket_connections",
(
"The number of times Prefect event clients have connected to an event stream, "
"broken down by direction (in/out) and connection (initial/reconnect)"
),
labelnames=["client", "direction", "connection"],
)
EVENT_WEBSOCKET_CHECKPOINTS = Counter(
"prefect_event_websocket_checkpoints",
"The number of checkpoints performed by Prefect event clients",
labelnames=["client"],
)

logger = get_logger(__name__)


Expand Down Expand Up @@ -82,14 +107,22 @@ def get_events_subscriber(
class EventsClient(abc.ABC):
"""The abstract interface for all Prefect Events clients"""

@property
def client_name(self) -> str:
return self.__class__.__name__

async def emit(self, event: Event) -> None:
"""Emit a single event"""
if not hasattr(self, "_in_context"):
raise TypeError(
"Events may only be emitted while this client is being used as a "
"context manager"
)
return await self._emit(event)

try:
return await self._emit(event)
finally:
EVENTS_EMITTED.labels(self.client_name).inc()

@abc.abstractmethod
async def _emit(self, event: Event) -> None: # pragma: no cover
Expand Down Expand Up @@ -299,6 +332,8 @@ async def _checkpoint(self, event: Event) -> None:
# don't clear the list, just the ones that we are sure of.
self._unconfirmed_events = self._unconfirmed_events[unconfirmed_count:]

EVENT_WEBSOCKET_CHECKPOINTS.labels(self.client_name).inc()

async def _emit(self, event: Event) -> None:
for i in range(self._reconnection_attempts + 1):
try:
Expand Down Expand Up @@ -426,10 +461,17 @@ def __init__(
if self._reconnection_attempts < 0:
raise ValueError("reconnection_attempts must be a non-negative integer")

@property
def client_name(self) -> str:
return self.__class__.__name__

async def __aenter__(self) -> Self:
# Don't handle any errors in the initial connection, because these are most
# likely a permission or configuration issue that should propagate
await self._reconnect()
try:
await self._reconnect()
finally:
EVENT_WEBSOCKET_CONNECTIONS.labels(self.client_name, "out", "initial")
return self

async def _reconnect(self) -> None:
Expand Down Expand Up @@ -503,7 +545,12 @@ async def __anext__(self) -> Event:
# Otherwise, after the first time through this loop, we're recovering
# from a ConnectionClosed, so reconnect now.
if not self._websocket or i > 0:
await self._reconnect()
try:
await self._reconnect()
finally:
EVENT_WEBSOCKET_CONNECTIONS.labels(
self.client_name, "out", "reconnect"
)
assert self._websocket

while True:
Expand All @@ -514,7 +561,10 @@ async def __anext__(self) -> Event:
continue
self._seen_events[event.id] = True

return event
try:
return event
finally:
EVENTS_OBSERVED.labels(self.client_name).inc()
except ConnectionClosedOK:
logger.debug('Connection closed with "OK" status')
raise StopAsyncIteration
Expand Down

0 comments on commit 4a58380

Please sign in to comment.