From 7179b9cf649b35f33cfa71e0d02fa1432d81d0f0 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 30 Nov 2023 12:10:06 -0800 Subject: [PATCH] feat: Record latency for the billing metrics consumer Since https://github.com/getsentry/arroyo/pull/308 Arroyo's CommitOffsets strategy records consumer latency automatically for consumers immediately prior to committing offsets. --- src/sentry/ingest/billing_metrics_consumer.py | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/sentry/ingest/billing_metrics_consumer.py b/src/sentry/ingest/billing_metrics_consumer.py index f436849d37c70..1def7d66bf54a 100644 --- a/src/sentry/ingest/billing_metrics_consumer.py +++ b/src/sentry/ingest/billing_metrics_consumer.py @@ -3,7 +3,11 @@ from typing import Any, Mapping, Optional, TypedDict, Union, cast from arroyo.backends.kafka import KafkaPayload -from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory +from arroyo.processing.strategies import ( + CommitOffsets, + ProcessingStrategy, + ProcessingStrategyFactory, +) from arroyo.types import Commit, Message, Partition from typing_extensions import NotRequired @@ -23,7 +27,7 @@ def create_with_partitions( commit: Commit, partitions: Mapping[Partition, int], ) -> ProcessingStrategy[KafkaPayload]: - return BillingTxCountMetricConsumerStrategy(commit) + return BillingTxCountMetricConsumerStrategy(CommitOffsets(commit)) class MetricsBucket(TypedDict): @@ -52,28 +56,26 @@ class BillingTxCountMetricConsumerStrategy(ProcessingStrategy[KafkaPayload]): metric_id = TRANSACTION_METRICS_NAMES["c:transactions/usage@none"] profile_tag_key = str(SHARED_TAG_STRINGS["has_profile"]) - def __init__( - self, - commit: Commit, - ) -> None: - self.__commit = commit + def __init__(self, next_step: ProcessingStrategy[Any]) -> None: + self.__next_step = next_step self.__closed = False def poll(self) -> None: - pass + self.__next_step.poll() def terminate(self) -> None: self.close() def close(self) -> None: self.__closed = True + self.__next_step.close() def submit(self, message: Message[KafkaPayload]) -> None: assert not self.__closed payload = self._get_payload(message) self._produce_billing_outcomes(payload) - self.__commit(message.committable) + self.__next_step.submit(message) def _get_payload(self, message: Message[KafkaPayload]) -> MetricsBucket: payload = json.loads(message.payload.value.decode("utf-8"), use_rapid_json=True) @@ -140,4 +142,4 @@ def _produce_billing_outcome( ) def join(self, timeout: Optional[float] = None) -> None: - self.__commit({}, force=True) + self.__next_step.join(timeout)