Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: Add more metrics for slow rebalancing #310

Merged
merged 3 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
logger.exception(f"{f.__name__} crashed")
raise
finally:
metrics.incr_timing(
"arroyo.consumer.callback.time", time.time() - start_time
value = time.time() - start_time
metrics.metrics.timing(
"arroyo.consumer.run.callback",
value,
tags={"callback_name": f.__name__},
)
metrics.incr_timing("arroyo.consumer.callback.time", value)

return cast(F, wrapper)

Expand Down Expand Up @@ -82,7 +86,7 @@ class InvalidStateError(RuntimeError):

class MetricsBuffer:
def __init__(self) -> None:
self.__metrics = get_metrics()
self.metrics = get_metrics()
self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float)
self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int)
self.__reset()
Expand All @@ -100,9 +104,9 @@ def flush(self) -> None:
value: Union[float, int]

for metric, value in self.__timers.items():
self.__metrics.timing(metric, value)
self.metrics.timing(metric, value)
for metric, value in self.__counters.items():
self.__metrics.increment(metric, value)
self.metrics.increment(metric, value)
self.__reset()

def __reset(self) -> None:
Expand Down Expand Up @@ -198,23 +202,38 @@ def _close_strategy() -> None:
self.__is_paused = False
self._clear_backpressure()

self.__metrics_buffer.incr_timing(
"arroyo.consumer.shutdown.time", time.time() - start_close
value = time.time() - start_close

self.__metrics_buffer.metrics.timing(
"arroyo.consumer.run.close_strategy", value
)

self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value)

def _create_strategy(partitions: Mapping[Partition, int]) -> None:
start_create = time.time()

self.__processing_strategy = (
self.__processor_factory.create_with_partitions(
self.__commit, partitions
)
)

self.__metrics_buffer.metrics.timing(
"arroyo.consumer.run.create_strategy", time.time() - start_create
)

logger.debug(
"Initialized processing strategy: %r", self.__processing_strategy
)

@_rdkafka_callback(metrics=self.__metrics_buffer)
def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
logger.info("New partitions assigned: %r", partitions)
self.__metrics_buffer.metrics.increment(
"arroyo.consumer.partitions_assigned.count", len(partitions)
)

self.__buffered_messages.reset()
if self.__dlq_policy:
self.__dlq_policy.reset_offsets(partitions)
Expand All @@ -230,6 +249,10 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
def on_partitions_revoked(partitions: Sequence[Partition]) -> None:
logger.info("Partitions to revoke: %r", partitions)

self.__metrics_buffer.metrics.increment(
"arroyo.consumer.partitions_revoked.count", len(partitions)
)

if partitions:
_close_strategy()

Expand Down
20 changes: 19 additions & 1 deletion arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,27 @@
# Time (unitless) spent in librdkafka callbacks. This metric's timings
# overlap other timings, and might spike at the same time.
"arroyo.consumer.callback.time",
# Time (unitless) spent in shutting down the consumer. This metric's
# timings overlap other timings, and might spike at the same time.
"arroyo.consumer.shutdown.time",
# A regular duration metric where each datapoint is measuring the time it
# took to execute a single callback. This metric is distinct from the
# arroyo.consumer.*.time metrics as it does not attempt to accumulate time
# spent per second in an attempt to keep monitoring overhead low.
#
# The metric is tagged by the name of the internal callback function being
# executed, as 'callback_name'. Possible values are on_partitions_assigned
# and on_partitions_revoked.
"arroyo.consumer.run.callback",
# Duration metric measuring the time it took to flush in-flight messages
# and shut down the strategies.
"arroyo.consumer.run.close_strategy",
# Duration metric measuring the time it took to create the processing strategy.
"arroyo.consumer.run.create_strategy",
# How many partitions have been revoked just now.
"arroyo.consumer.partitions_revoked.count",
# How many partitions have been assigned just now.
"arroyo.consumer.partitions_assigned.count",
# Time (unitless) spent in shutting down the consumer. This metric's
# Consumer latency in seconds. Recorded by the commit offsets strategy.
"arroyo.consumer.latency",
# Queue size of background queue that librdkafka uses to prefetch messages.
Expand Down
Loading