From f2f0a43e38ced1b248f1276db831da76a07f3340 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 1 Dec 2023 20:56:39 +0100 Subject: [PATCH 1/3] ref: Add more metrics for slow rebalancing We have unitless/cumulative times for all of those things, but we do not actually know how often they happen. --- arroyo/processing/processor.py | 29 ++++++++++++++++++++++------- arroyo/utils/metric_defs.py | 16 +++++++++++++++- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 09356451..392c3f59 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -50,9 +50,13 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: logger.exception(f"{f.__name__} crashed") raise finally: - metrics.incr_timing( - "arroyo.consumer.callback.time", time.time() - start_time + value = time.time() - start_time + metrics.metrics.timing( + "arroyo.consumer.run.callback", + value, + tags={"callback_name": f.__name__}, ) + metrics.incr_timing("arroyo.consumer.callback.time", value) return cast(F, wrapper) @@ -82,7 +86,7 @@ class InvalidStateError(RuntimeError): class MetricsBuffer: def __init__(self) -> None: - self.__metrics = get_metrics() + self.metrics = get_metrics() self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float) self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int) self.__reset() @@ -100,9 +104,9 @@ def flush(self) -> None: value: Union[float, int] for metric, value in self.__timers.items(): - self.__metrics.timing(metric, value) + self.metrics.timing(metric, value) for metric, value in self.__counters.items(): - self.__metrics.increment(metric, value) + self.metrics.increment(metric, value) self.__reset() def __reset(self) -> None: @@ -198,16 +202,27 @@ def _close_strategy() -> None: self.__is_paused = False self._clear_backpressure() - self.__metrics_buffer.incr_timing( - "arroyo.consumer.shutdown.time", time.time() - start_close + value = time.time() - start_close + + self.__metrics_buffer.metrics.timing( + "arroyo.consumer.run.close_strategy", value ) + self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value) + def _create_strategy(partitions: Mapping[Partition, int]) -> None: + start_create = time.time() + self.__processing_strategy = ( self.__processor_factory.create_with_partitions( self.__commit, partitions ) ) + + self.__metrics_buffer.metrics.timing( + "arroyo.consumer.run.create_strategy", time.time() - start_create + ) + logger.debug( "Initialized processing strategy: %r", self.__processing_strategy ) diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index eaff217b..18450452 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -59,9 +59,23 @@ # Time (unitless) spent in librdkafka callbacks. This metric's timings # overlap other timings, and might spike at the same time. "arroyo.consumer.callback.time", - # Time (unitless) spent in shutting down the consumer. This metric's # timings overlap other timings, and might spike at the same time. "arroyo.consumer.shutdown.time", + # A regular duration metric where each datapoint is measuring the time it + # took to execute a single callback. This metric is distinct from the + # arroyo.consumer.*.time metrics as it does not attempt to accumulate time + # spent per second in an attempt to keep monitoring overhead low. + # + # The metric is tagged by the name of the internal callback function being + # executed, as 'callback_name'. Possible values are on_partitions_assigned + # and on_partitions_revoked. + "arroyo.consumer.run.callback", + # Duration metric measuring the time it took to flush in-flight messages + # and shut down the strategies. + "arroyo.consumer.run.close_strategy", + # Duration metric measuring the time it took to create the processing strategy. + "arroyo.consumer.run.create_strategy", + # Time (unitless) spent in shutting down the consumer. This metric's # Consumer latency in seconds. Recorded by the commit offsets strategy. "arroyo.consumer.latency", # Queue size of background queue that librdkafka uses to prefetch messages. From 01b482b5779552deb6aec64f0cf040d382426574 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 1 Dec 2023 21:00:05 +0100 Subject: [PATCH 2/3] add partition count --- arroyo/processing/processor.py | 8 ++++++++ arroyo/utils/metric_defs.py | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 392c3f59..b77101c2 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -230,6 +230,10 @@ def _create_strategy(partitions: Mapping[Partition, int]) -> None: @_rdkafka_callback(metrics=self.__metrics_buffer) def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: logger.info("New partitions assigned: %r", partitions) + self.__metrics_buffer.metrics.increment( + "arroyo.consumer.partitions_assigned.count", len(partitions) + ) + self.__buffered_messages.reset() if self.__dlq_policy: self.__dlq_policy.reset_offsets(partitions) @@ -245,6 +249,10 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: def on_partitions_revoked(partitions: Sequence[Partition]) -> None: logger.info("Partitions to revoke: %r", partitions) + self.__metrics_buffer.metrics.increment( + "arroyo.consumer.partitions_revoked.count", len(partitions) + ) + if partitions: _close_strategy() diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index 18450452..655b1a7d 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -75,6 +75,10 @@ "arroyo.consumer.run.close_strategy", # Duration metric measuring the time it took to create the processing strategy. "arroyo.consumer.run.create_strategy", + # How many partitions have been revoked just now. + "arroyo.consumer.partitions_revoked.count", + # How many partitions have been assigned just now. + "arroyo.consumer.partitions_assigned.count", # Time (unitless) spent in shutting down the consumer. This metric's # Consumer latency in seconds. Recorded by the commit offsets strategy. "arroyo.consumer.latency", From d851538a9ea8f4b58f74956acc0a231aebaa55bd Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 1 Dec 2023 21:09:37 +0100 Subject: [PATCH 3/3] fix tests --- tests/processing/test_processor.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index d9a7a853..31cf8719 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -126,10 +126,24 @@ def test_stream_processor_lifecycle() -> None: processor._shutdown() assert list((type(call), call.name) for call in metrics.calls) == [ + (Increment, "arroyo.consumer.partitions_assigned.count"), + (Timing, "arroyo.consumer.run.create_strategy"), + (Timing, "arroyo.consumer.run.callback"), (Timing, "arroyo.consumer.poll.time"), (Timing, "arroyo.consumer.callback.time"), (Timing, "arroyo.consumer.processing.time"), (Increment, "arroyo.consumer.run.count"), + (Increment, "arroyo.consumer.partitions_assigned.count"), + (Timing, "arroyo.consumer.run.close_strategy"), + (Timing, "arroyo.consumer.run.create_strategy"), + (Timing, "arroyo.consumer.run.callback"), + (Increment, "arroyo.consumer.partitions_revoked.count"), + (Timing, "arroyo.consumer.run.close_strategy"), + (Timing, "arroyo.consumer.run.callback"), + (Increment, "arroyo.consumer.partitions_revoked.count"), + (Timing, "arroyo.consumer.run.callback"), + (Increment, "arroyo.consumer.partitions_revoked.count"), + (Timing, "arroyo.consumer.run.callback"), (Timing, "arroyo.consumer.processing.time"), (Timing, "arroyo.consumer.backpressure.time"), (Timing, "arroyo.consumer.join.time"),