Skip to content

Commit

Permalink
feat: Enable automatic latency recording for consumers (#308)
Browse files Browse the repository at this point in the history
The goal of this change is to record a common latency
metric for all consumers that use Arroyo.

The new metric is called `arroyo.consumer.latency` -- it
represents the time in seconds from when the message was
initially produced (broker timestamp) to when the consumer
successfully committed the offset for a message.

Note that if a consumer is doing either manual batching (not using
Arroyo's built in Reduce/Batch) or manual committing of offsets
(not using Arroyo's CommitOffsets strategy), then the metric
will not be recorded automatically.

Closes #307
  • Loading branch information
lynnagara authored Nov 22, 2023
1 parent 719f244 commit 33d0b75
Show file tree
Hide file tree
Showing 19 changed files with 217 additions and 95 deletions.
13 changes: 12 additions & 1 deletion arroyo/processing/strategies/commit.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import time
from typing import Any, Optional

from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.types import Commit, Message
from arroyo.utils.metrics import get_metrics


class CommitOffsets(ProcessingStrategy[Any]):
"""
Just commits offsets.
Commits offset and records consumer latency metric.
This should always be used as the last step in a chain of processing
strategies. It commits offsets back to the broker after all prior
Expand All @@ -15,11 +17,20 @@ class CommitOffsets(ProcessingStrategy[Any]):

def __init__(self, commit: Commit) -> None:
self.__commit = commit
self.__metrics = get_metrics()
self.__last_record_time: Optional[float] = None

def poll(self) -> None:
self.__commit({})

def submit(self, message: Message[Any]) -> None:
now = time.time()
if self.__last_record_time is None or now - self.__last_record_time > 1:
if message.timestamp is not None:
self.__metrics.increment(
"arroyo.consumer.latency", now - message.timestamp.timestamp()
)
self.__last_record_time = now
self.__commit(message.committable)

def close(self) -> None:
Expand Down
12 changes: 10 additions & 2 deletions arroyo/processing/strategies/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ def poll(self) -> None:
break

message = Message(
Value(future.result().payload, original_message.committable)
Value(
future.result().payload,
original_message.committable,
original_message.timestamp,
)
)

self.__queue.popleft()
Expand Down Expand Up @@ -122,7 +126,11 @@ def join(self, timeout: Optional[float] = None) -> None:
break

message = Message(
Value(future.result().payload, original_message.committable)
Value(
future.result().payload,
original_message.committable,
original_message.timestamp,
)
)

self.__next_step.poll()
Expand Down
21 changes: 18 additions & 3 deletions arroyo/processing/strategies/reduce.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from datetime import datetime
from typing import Callable, Generic, MutableMapping, Optional, TypeVar, Union, cast

from arroyo.processing.strategies import MessageRejected, ProcessingStrategy
Expand Down Expand Up @@ -28,12 +29,17 @@ def __init__(
self.__max_batch_size = max_batch_size
self.__accumulator = accumulator
self.__accumulated_value = initial_value

# For latency recording.
# The timestamp of the last message in the batch is one applied to the batch
self.__last_timestamp: Optional[datetime] = None
self.__count = 0
self.__offsets: MutableMapping[Partition, int] = {}
self.init_time = time.time()

def append(self, value: BaseValue[Union[FilteredPayload, TPayload]]) -> None:
self.__offsets.update(value.committable)
self.__last_timestamp = value.timestamp
if not isinstance(value.payload, FilteredPayload):
self.__accumulated_value = self.__accumulator(
self.__accumulated_value, cast(BaseValue[TPayload], value)
Expand All @@ -46,13 +52,22 @@ def build_if_ready(self) -> Optional[Value[TResult]]:
self.__count >= self.__max_batch_size
or time.time() > self.init_time + self.__max_batch_time
):

return Value(payload=self.__accumulated_value, committable=self.__offsets)
assert isinstance(self.__last_timestamp, datetime)
return Value(
payload=self.__accumulated_value,
committable=self.__offsets,
timestamp=self.__last_timestamp,
)
else:
return None

def build(self) -> Value[TResult]:
return Value(payload=self.__accumulated_value, committable=self.__offsets)
assert isinstance(self.__last_timestamp, datetime)
return Value(
payload=self.__accumulated_value,
committable=self.__offsets,
timestamp=self.__last_timestamp,
)


class Reduce(
Expand Down
6 changes: 4 additions & 2 deletions arroyo/processing/strategies/unfold.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ def submit(self, message: Message[Union[FilteredPayload, TInput]]) -> None:
for idx, value in enumerate(iterable):
# Last message is special because offsets can be committed along with it
if idx == num_messages - 1:
next_message = Message(Value(value, message.committable))
next_message = Message(
Value(value, message.committable, message.timestamp)
)
else:
next_message = Message(Value(value, {}))
next_message = Message(Value(value, {}, message.timestamp))

if store_remaining_messages == False:
try:
Expand Down
23 changes: 20 additions & 3 deletions arroyo/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dataclasses import dataclass
from datetime import datetime
from typing import Any, Generic, Mapping, Protocol, TypeVar
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar

TReplaced = TypeVar("TReplaced")

Expand Down Expand Up @@ -89,6 +89,10 @@ def payload_unfiltered(self) -> TMessagePayload:
def committable(self) -> Mapping[Partition, int]:
return self.value.committable

@property
def timestamp(self) -> Optional[datetime]:
return self.value.timestamp

def replace(self, payload: TReplaced) -> Message[TReplaced]:
return Message(self.value.replace(payload))

Expand All @@ -102,6 +106,10 @@ def payload(self) -> TMessagePayload:
def committable(self) -> Mapping[Partition, int]:
raise NotImplementedError()

@property
def timestamp(self) -> Optional[datetime]:
raise NotImplementedError()

def replace(self, value: TReplaced) -> BaseValue[TReplaced]:
raise NotImplementedError

Expand All @@ -116,12 +124,17 @@ class Value(BaseValue[TMessagePayload]):
__slots__ = ["__payload", "__committable"]
__payload: TMessagePayload
__committable: Mapping[Partition, int]
__timestamp: Optional[datetime]

def __init__(
self, payload: TMessagePayload, committable: Mapping[Partition, int]
self,
payload: TMessagePayload,
committable: Mapping[Partition, int],
timestamp: Optional[datetime] = None,
) -> None:
self.__payload = payload
self.__committable = committable
self.__timestamp = timestamp

@property
def payload(self) -> TMessagePayload:
Expand All @@ -131,8 +144,12 @@ def payload(self) -> TMessagePayload:
def committable(self) -> Mapping[Partition, int]:
return self.__committable

@property
def timestamp(self) -> Optional[datetime]:
return self.__timestamp

def replace(self, value: TReplaced) -> BaseValue[TReplaced]:
return Value(value, self.__committable)
return Value(value, self.__committable, self.timestamp)


@dataclass(unsafe_hash=True)
Expand Down
2 changes: 2 additions & 0 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
# Time (unitless) spent in shutting down the consumer. This metric's
# timings overlap other timings, and might spike at the same time.
"arroyo.consumer.shutdown.time",
# Consumer latency in seconds. Recorded by the commit offsets strategy.
"arroyo.consumer.latency",
# Queue size of background queue that librdkafka uses to prefetch messages.
"arroyo.consumer.librdkafka.total_queue_size",
# Counter metric to measure how often the healthcheck file has been touched.
Expand Down
2 changes: 1 addition & 1 deletion examples/transform_and_produce/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def index_data(
value=json.dumps(indexed_messages[i]).encode(),
),
committable=batch.payload[i].committable,
timestamp=batch.timestamp,
)
)
return ret
Expand Down Expand Up @@ -71,7 +72,6 @@ def create_with_partitions(
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:

unbatch: UnbatchStep[KafkaPayload] = UnbatchStep(
next_step=Produce(self.__producer, self.__topic, CommitOffsets(commit))
)
Expand Down
4 changes: 2 additions & 2 deletions tests/processing/strategies/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ def test_function(message: Message[bool]) -> bool:
return message.payload

messages: Sequence[DummyMessage] = [
Message(Value(True, {Partition(Topic("topic"), 0): 1})),
Message(Value(True, {Partition(Topic("topic"), 0): 1}, NOW)),
Message(Value(FILTERED_PAYLOAD, {Partition(Topic("topic"), 0): 2})),
Message(Value(True, {Partition(Topic("topic"), 0): 3})),
Message(Value(True, {Partition(Topic("topic"), 0): 3}, NOW)),
]

for message in messages:
Expand Down
39 changes: 25 additions & 14 deletions tests/processing/strategies/test_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import BaseValue, BrokerValue, Message, Partition, Topic, Value

NOW = datetime(2022, 1, 1, 0, 0, 1)


def broker_value(partition: int, offset: int, payload: str) -> BrokerValue[str]:
return BrokerValue(
partition=Partition(topic=Topic("test"), index=partition),
offset=offset,
payload=payload,
timestamp=datetime(2022, 1, 1, 0, 0, 1),
timestamp=NOW,
)


def value(committable: Mapping[Partition, int], payload: str) -> Value[str]:
return Value(payload=payload, committable=committable)
return Value(
payload=payload,
committable=committable,
timestamp=NOW,
)


test_builder = [
Expand Down Expand Up @@ -174,6 +180,7 @@ def message(partition: int, offset: int, payload: str) -> Message[str]:
broker_value(0, 3, "Message 3"),
],
committable={Partition(Topic("test"), 0): 4},
timestamp=datetime(2022, 1, 1, 0, 0, 1),
),
)
)
Expand All @@ -200,6 +207,7 @@ def message(partition: int, offset: int, payload: str) -> Message[str]:
broker_value(0, 3, "Message 3"),
],
committable={Partition(Topic("test"), 0): 4},
timestamp=NOW,
),
)
),
Expand All @@ -212,6 +220,7 @@ def message(partition: int, offset: int, payload: str) -> Message[str]:
broker_value(1, 3, "Message 3"),
],
committable={Partition(Topic("test"), 1): 4},
timestamp=NOW,
),
)
),
Expand Down Expand Up @@ -267,6 +276,7 @@ def test_batch_join() -> None:
broker_value(0, 2, "Message 2"),
],
committable={Partition(Topic("test"), 0): 3},
timestamp=NOW,
),
)
)
Expand All @@ -286,6 +296,7 @@ def test_unbatch_step() -> None:
],
),
committable={Partition(Topic("test"), 1): 4},
timestamp=NOW,
),
)

Expand All @@ -296,9 +307,9 @@ def test_unbatch_step() -> None:
unbatch_step.submit(msg)
next_step.submit.assert_has_calls(
[
call(Message(Value("Message 1", {}))),
call(Message(Value("Message 2", {}))),
call(Message(Value("Message 3", {partition: 4}))),
call(Message(Value("Message 1", {}, NOW))),
call(Message(Value("Message 2", {}, NOW))),
call(Message(Value("Message 3", {partition: 4}, NOW))),
]
)

Expand All @@ -317,9 +328,9 @@ def test_unbatch_step() -> None:
unbatch_step.poll()
next_step.submit.assert_has_calls(
[
call(Message(Value("Message 1", {}))),
call(Message(Value("Message 2", {}))),
call(Message(Value("Message 3", {partition: 4}))),
call(Message(Value("Message 1", {}, NOW))),
call(Message(Value("Message 2", {}, NOW))),
call(Message(Value("Message 3", {partition: 4}, NOW))),
]
)

Expand All @@ -330,9 +341,9 @@ def test_unbatch_step() -> None:

next_step.submit.assert_has_calls(
[
call(Message(Value("Message 1", {}))),
call(Message(Value("Message 2", {}))),
call(Message(Value("Message 3", {partition: 4}))),
call(Message(Value("Message 1", {}, NOW))),
call(Message(Value("Message 2", {}, NOW))),
call(Message(Value("Message 3", {partition: 4}, NOW))),
]
)

Expand Down Expand Up @@ -368,8 +379,8 @@ def transformer(

final_step.submit.assert_has_calls(
[
call(Message(Value("Transformed", {}))),
call(Message(Value("Transformed", {}))),
call(Message(Value("Transformed", {partition: 4}))),
call(Message(Value("Transformed", {}, NOW))),
call(Message(Value("Transformed", {}, NOW))),
call(Message(Value("Transformed", {partition: 4}, NOW))),
]
)
21 changes: 19 additions & 2 deletions tests/processing/strategies/test_commit.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from unittest.mock import Mock
from datetime import datetime
from unittest.mock import ANY, Mock

from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.types import Message, Partition, Topic, Value
from tests.metrics import Increment, TestingMetricsBackend


def test_commit() -> None:
commit_func = Mock()
strategy = CommitOffsets(commit_func)

strategy.submit(Message(Value(b"", {Partition(Topic("topic"), 1): 5})))
strategy.submit(
Message(Value(b"", {Partition(Topic("topic"), 1): 5}, datetime.now()))
)

assert commit_func.call_count == 1

Expand All @@ -23,3 +27,16 @@ def test_commit_poll() -> None:

assert commit_func.call_count == 1


def test_record_metric() -> None:
commit_func = Mock()
strategy = CommitOffsets(commit_func)
now = datetime.now()

strategy.submit(Message(Value(b"", {Partition(Topic("topic"), 1): 5}, now)))
strategy.poll()

metrics = TestingMetricsBackend
assert metrics.calls == [
Increment(name="arroyo.consumer.latency", value=ANY, tags=None)
]
Loading

0 comments on commit 33d0b75

Please sign in to comment.