diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 09356451..b77101c2 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -50,9 +50,13 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: logger.exception(f"{f.__name__} crashed") raise finally: - metrics.incr_timing( - "arroyo.consumer.callback.time", time.time() - start_time + value = time.time() - start_time + metrics.metrics.timing( + "arroyo.consumer.run.callback", + value, + tags={"callback_name": f.__name__}, ) + metrics.incr_timing("arroyo.consumer.callback.time", value) return cast(F, wrapper) @@ -82,7 +86,7 @@ class InvalidStateError(RuntimeError): class MetricsBuffer: def __init__(self) -> None: - self.__metrics = get_metrics() + self.metrics = get_metrics() self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float) self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int) self.__reset() @@ -100,9 +104,9 @@ def flush(self) -> None: value: Union[float, int] for metric, value in self.__timers.items(): - self.__metrics.timing(metric, value) + self.metrics.timing(metric, value) for metric, value in self.__counters.items(): - self.__metrics.increment(metric, value) + self.metrics.increment(metric, value) self.__reset() def __reset(self) -> None: @@ -198,16 +202,27 @@ def _close_strategy() -> None: self.__is_paused = False self._clear_backpressure() - self.__metrics_buffer.incr_timing( - "arroyo.consumer.shutdown.time", time.time() - start_close + value = time.time() - start_close + + self.__metrics_buffer.metrics.timing( + "arroyo.consumer.run.close_strategy", value ) + self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value) + def _create_strategy(partitions: Mapping[Partition, int]) -> None: + start_create = time.time() + self.__processing_strategy = ( self.__processor_factory.create_with_partitions( self.__commit, partitions ) ) + + self.__metrics_buffer.metrics.timing( + "arroyo.consumer.run.create_strategy", time.time() - start_create + ) + logger.debug( "Initialized processing strategy: %r", self.__processing_strategy ) @@ -215,6 +230,10 @@ def _create_strategy(partitions: Mapping[Partition, int]) -> None: @_rdkafka_callback(metrics=self.__metrics_buffer) def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: logger.info("New partitions assigned: %r", partitions) + self.__metrics_buffer.metrics.increment( + "arroyo.consumer.partitions_assigned.count", len(partitions) + ) + self.__buffered_messages.reset() if self.__dlq_policy: self.__dlq_policy.reset_offsets(partitions) @@ -230,6 +249,10 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: def on_partitions_revoked(partitions: Sequence[Partition]) -> None: logger.info("Partitions to revoke: %r", partitions) + self.__metrics_buffer.metrics.increment( + "arroyo.consumer.partitions_revoked.count", len(partitions) + ) + if partitions: _close_strategy() diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index eaff217b..655b1a7d 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -59,9 +59,27 @@ # Time (unitless) spent in librdkafka callbacks. This metric's timings # overlap other timings, and might spike at the same time. "arroyo.consumer.callback.time", - # Time (unitless) spent in shutting down the consumer. This metric's # timings overlap other timings, and might spike at the same time. "arroyo.consumer.shutdown.time", + # A regular duration metric where each datapoint is measuring the time it + # took to execute a single callback. This metric is distinct from the + # arroyo.consumer.*.time metrics as it does not attempt to accumulate time + # spent per second in an attempt to keep monitoring overhead low. + # + # The metric is tagged by the name of the internal callback function being + # executed, as 'callback_name'. Possible values are on_partitions_assigned + # and on_partitions_revoked. + "arroyo.consumer.run.callback", + # Duration metric measuring the time it took to flush in-flight messages + # and shut down the strategies. + "arroyo.consumer.run.close_strategy", + # Duration metric measuring the time it took to create the processing strategy. + "arroyo.consumer.run.create_strategy", + # How many partitions have been revoked just now. + "arroyo.consumer.partitions_revoked.count", + # How many partitions have been assigned just now. + "arroyo.consumer.partitions_assigned.count", + # Time (unitless) spent in shutting down the consumer. This metric's # Consumer latency in seconds. Recorded by the commit offsets strategy. "arroyo.consumer.latency", # Queue size of background queue that librdkafka uses to prefetch messages. diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index d9a7a853..31cf8719 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -126,10 +126,24 @@ def test_stream_processor_lifecycle() -> None: processor._shutdown() assert list((type(call), call.name) for call in metrics.calls) == [ + (Increment, "arroyo.consumer.partitions_assigned.count"), + (Timing, "arroyo.consumer.run.create_strategy"), + (Timing, "arroyo.consumer.run.callback"), (Timing, "arroyo.consumer.poll.time"), (Timing, "arroyo.consumer.callback.time"), (Timing, "arroyo.consumer.processing.time"), (Increment, "arroyo.consumer.run.count"), + (Increment, "arroyo.consumer.partitions_assigned.count"), + (Timing, "arroyo.consumer.run.close_strategy"), + (Timing, "arroyo.consumer.run.create_strategy"), + (Timing, "arroyo.consumer.run.callback"), + (Increment, "arroyo.consumer.partitions_revoked.count"), + (Timing, "arroyo.consumer.run.close_strategy"), + (Timing, "arroyo.consumer.run.callback"), + (Increment, "arroyo.consumer.partitions_revoked.count"), + (Timing, "arroyo.consumer.run.callback"), + (Increment, "arroyo.consumer.partitions_revoked.count"), + (Timing, "arroyo.consumer.run.callback"), (Timing, "arroyo.consumer.processing.time"), (Timing, "arroyo.consumer.backpressure.time"), (Timing, "arroyo.consumer.join.time"),