Skip to content

Commit

Permalink
ref: Add more metrics for slow rebalancing (#310)
Browse files Browse the repository at this point in the history
* ref: Add more metrics for slow rebalancing

We have unitless/cumulative times for all of those things, but we do not
actually know how often they happen.

* add partition count

* fix tests
  • Loading branch information
untitaker authored Dec 1, 2023
1 parent 6e71533 commit c15fbbd
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 8 deletions.
37 changes: 30 additions & 7 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
logger.exception(f"{f.__name__} crashed")
raise
finally:
metrics.incr_timing(
"arroyo.consumer.callback.time", time.time() - start_time
value = time.time() - start_time
metrics.metrics.timing(
"arroyo.consumer.run.callback",
value,
tags={"callback_name": f.__name__},
)
metrics.incr_timing("arroyo.consumer.callback.time", value)

return cast(F, wrapper)

Expand Down Expand Up @@ -82,7 +86,7 @@ class InvalidStateError(RuntimeError):

class MetricsBuffer:
def __init__(self) -> None:
self.__metrics = get_metrics()
self.metrics = get_metrics()
self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float)
self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int)
self.__reset()
Expand All @@ -100,9 +104,9 @@ def flush(self) -> None:
value: Union[float, int]

for metric, value in self.__timers.items():
self.__metrics.timing(metric, value)
self.metrics.timing(metric, value)
for metric, value in self.__counters.items():
self.__metrics.increment(metric, value)
self.metrics.increment(metric, value)
self.__reset()

def __reset(self) -> None:
Expand Down Expand Up @@ -198,23 +202,38 @@ def _close_strategy() -> None:
self.__is_paused = False
self._clear_backpressure()

self.__metrics_buffer.incr_timing(
"arroyo.consumer.shutdown.time", time.time() - start_close
value = time.time() - start_close

self.__metrics_buffer.metrics.timing(
"arroyo.consumer.run.close_strategy", value
)

self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value)

def _create_strategy(partitions: Mapping[Partition, int]) -> None:
start_create = time.time()

self.__processing_strategy = (
self.__processor_factory.create_with_partitions(
self.__commit, partitions
)
)

self.__metrics_buffer.metrics.timing(
"arroyo.consumer.run.create_strategy", time.time() - start_create
)

logger.debug(
"Initialized processing strategy: %r", self.__processing_strategy
)

@_rdkafka_callback(metrics=self.__metrics_buffer)
def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
logger.info("New partitions assigned: %r", partitions)
self.__metrics_buffer.metrics.increment(
"arroyo.consumer.partitions_assigned.count", len(partitions)
)

self.__buffered_messages.reset()
if self.__dlq_policy:
self.__dlq_policy.reset_offsets(partitions)
Expand All @@ -230,6 +249,10 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
def on_partitions_revoked(partitions: Sequence[Partition]) -> None:
logger.info("Partitions to revoke: %r", partitions)

self.__metrics_buffer.metrics.increment(
"arroyo.consumer.partitions_revoked.count", len(partitions)
)

if partitions:
_close_strategy()

Expand Down
20 changes: 19 additions & 1 deletion arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,27 @@
# Time (unitless) spent in librdkafka callbacks. This metric's timings
# overlap other timings, and might spike at the same time.
"arroyo.consumer.callback.time",
# Time (unitless) spent in shutting down the consumer. This metric's
# timings overlap other timings, and might spike at the same time.
"arroyo.consumer.shutdown.time",
# A regular duration metric where each datapoint is measuring the time it
# took to execute a single callback. This metric is distinct from the
# arroyo.consumer.*.time metrics as it does not attempt to accumulate time
# spent per second in an attempt to keep monitoring overhead low.
#
# The metric is tagged by the name of the internal callback function being
# executed, as 'callback_name'. Possible values are on_partitions_assigned
# and on_partitions_revoked.
"arroyo.consumer.run.callback",
# Duration metric measuring the time it took to flush in-flight messages
# and shut down the strategies.
"arroyo.consumer.run.close_strategy",
# Duration metric measuring the time it took to create the processing strategy.
"arroyo.consumer.run.create_strategy",
# How many partitions have been revoked just now.
"arroyo.consumer.partitions_revoked.count",
# How many partitions have been assigned just now.
"arroyo.consumer.partitions_assigned.count",
# Time (unitless) spent in shutting down the consumer. This metric's
# Consumer latency in seconds. Recorded by the commit offsets strategy.
"arroyo.consumer.latency",
# Queue size of background queue that librdkafka uses to prefetch messages.
Expand Down
14 changes: 14 additions & 0 deletions tests/processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,24 @@ def test_stream_processor_lifecycle() -> None:
processor._shutdown()

assert list((type(call), call.name) for call in metrics.calls) == [
(Increment, "arroyo.consumer.partitions_assigned.count"),
(Timing, "arroyo.consumer.run.create_strategy"),
(Timing, "arroyo.consumer.run.callback"),
(Timing, "arroyo.consumer.poll.time"),
(Timing, "arroyo.consumer.callback.time"),
(Timing, "arroyo.consumer.processing.time"),
(Increment, "arroyo.consumer.run.count"),
(Increment, "arroyo.consumer.partitions_assigned.count"),
(Timing, "arroyo.consumer.run.close_strategy"),
(Timing, "arroyo.consumer.run.create_strategy"),
(Timing, "arroyo.consumer.run.callback"),
(Increment, "arroyo.consumer.partitions_revoked.count"),
(Timing, "arroyo.consumer.run.close_strategy"),
(Timing, "arroyo.consumer.run.callback"),
(Increment, "arroyo.consumer.partitions_revoked.count"),
(Timing, "arroyo.consumer.run.callback"),
(Increment, "arroyo.consumer.partitions_revoked.count"),
(Timing, "arroyo.consumer.run.callback"),
(Timing, "arroyo.consumer.processing.time"),
(Timing, "arroyo.consumer.backpressure.time"),
(Timing, "arroyo.consumer.join.time"),
Expand Down

0 comments on commit c15fbbd

Please sign in to comment.