Skip to content

Commit

Permalink
feat: Record latency for the billing metrics consumer
Browse files Browse the repository at this point in the history
Since getsentry/arroyo#308 Arroyo's CommitOffsets strategy
records consumer latency automatically for consumers immediately prior
to committing offsets.
  • Loading branch information
lynnagara committed Nov 30, 2023
1 parent 426a48b commit 7179b9c
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions src/sentry/ingest/billing_metrics_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from typing import Any, Mapping, Optional, TypedDict, Union, cast

from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies import (
CommitOffsets,
ProcessingStrategy,
ProcessingStrategyFactory,
)
from arroyo.types import Commit, Message, Partition
from typing_extensions import NotRequired

Expand All @@ -23,7 +27,7 @@ def create_with_partitions(
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return BillingTxCountMetricConsumerStrategy(commit)
return BillingTxCountMetricConsumerStrategy(CommitOffsets(commit))


class MetricsBucket(TypedDict):
Expand Down Expand Up @@ -52,28 +56,26 @@ class BillingTxCountMetricConsumerStrategy(ProcessingStrategy[KafkaPayload]):
metric_id = TRANSACTION_METRICS_NAMES["c:transactions/usage@none"]
profile_tag_key = str(SHARED_TAG_STRINGS["has_profile"])

def __init__(
self,
commit: Commit,
) -> None:
self.__commit = commit
def __init__(self, next_step: ProcessingStrategy[Any]) -> None:
self.__next_step = next_step
self.__closed = False

def poll(self) -> None:
pass
self.__next_step.poll()

def terminate(self) -> None:
self.close()

def close(self) -> None:
self.__closed = True
self.__next_step.close()

def submit(self, message: Message[KafkaPayload]) -> None:
assert not self.__closed

payload = self._get_payload(message)
self._produce_billing_outcomes(payload)
self.__commit(message.committable)
self.__next_step.submit(message)

def _get_payload(self, message: Message[KafkaPayload]) -> MetricsBucket:
payload = json.loads(message.payload.value.decode("utf-8"), use_rapid_json=True)
Expand Down Expand Up @@ -140,4 +142,4 @@ def _produce_billing_outcome(
)

def join(self, timeout: Optional[float] = None) -> None:
self.__commit({}, force=True)
self.__next_step.join(timeout)

0 comments on commit 7179b9c

Please sign in to comment.