diff --git a/arroyo/processing/strategies/commit.py b/arroyo/processing/strategies/commit.py index 93816a8e..e447d2ad 100644 --- a/arroyo/processing/strategies/commit.py +++ b/arroyo/processing/strategies/commit.py @@ -1,12 +1,14 @@ +import time from typing import Any, Optional from arroyo.processing.strategies.abstract import ProcessingStrategy from arroyo.types import Commit, Message +from arroyo.utils.metrics import get_metrics class CommitOffsets(ProcessingStrategy[Any]): """ - Just commits offsets. + Commits offset and records consumer latency metric. This should always be used as the last step in a chain of processing strategies. It commits offsets back to the broker after all prior @@ -15,11 +17,20 @@ class CommitOffsets(ProcessingStrategy[Any]): def __init__(self, commit: Commit) -> None: self.__commit = commit + self.__metrics = get_metrics() + self.__last_record_time: Optional[float] = None def poll(self) -> None: self.__commit({}) def submit(self, message: Message[Any]) -> None: + now = time.time() + if self.__last_record_time is None or now - self.__last_record_time > 1: + if message.timestamp is not None: + self.__metrics.increment( + "arroyo.consumer.latency", now - message.timestamp.timestamp() + ) + self.__last_record_time = now self.__commit(message.committable) def close(self) -> None: diff --git a/arroyo/processing/strategies/produce.py b/arroyo/processing/strategies/produce.py index cc9fef7d..8c0fa583 100644 --- a/arroyo/processing/strategies/produce.py +++ b/arroyo/processing/strategies/produce.py @@ -69,7 +69,11 @@ def poll(self) -> None: break message = Message( - Value(future.result().payload, original_message.committable) + Value( + future.result().payload, + original_message.committable, + original_message.timestamp, + ) ) self.__queue.popleft() @@ -122,7 +126,11 @@ def join(self, timeout: Optional[float] = None) -> None: break message = Message( - Value(future.result().payload, original_message.committable) + Value( + future.result().payload, + original_message.committable, + original_message.timestamp, + ) ) self.__next_step.poll() diff --git a/arroyo/processing/strategies/reduce.py b/arroyo/processing/strategies/reduce.py index 66a952b6..728d18c3 100644 --- a/arroyo/processing/strategies/reduce.py +++ b/arroyo/processing/strategies/reduce.py @@ -1,4 +1,5 @@ import time +from datetime import datetime from typing import Callable, Generic, MutableMapping, Optional, TypeVar, Union, cast from arroyo.processing.strategies import MessageRejected, ProcessingStrategy @@ -28,12 +29,17 @@ def __init__( self.__max_batch_size = max_batch_size self.__accumulator = accumulator self.__accumulated_value = initial_value + + # For latency recording. + # The timestamp of the last message in the batch is one applied to the batch + self.__last_timestamp: Optional[datetime] = None self.__count = 0 self.__offsets: MutableMapping[Partition, int] = {} self.init_time = time.time() def append(self, value: BaseValue[Union[FilteredPayload, TPayload]]) -> None: self.__offsets.update(value.committable) + self.__last_timestamp = value.timestamp if not isinstance(value.payload, FilteredPayload): self.__accumulated_value = self.__accumulator( self.__accumulated_value, cast(BaseValue[TPayload], value) @@ -46,13 +52,22 @@ def build_if_ready(self) -> Optional[Value[TResult]]: self.__count >= self.__max_batch_size or time.time() > self.init_time + self.__max_batch_time ): - - return Value(payload=self.__accumulated_value, committable=self.__offsets) + assert isinstance(self.__last_timestamp, datetime) + return Value( + payload=self.__accumulated_value, + committable=self.__offsets, + timestamp=self.__last_timestamp, + ) else: return None def build(self) -> Value[TResult]: - return Value(payload=self.__accumulated_value, committable=self.__offsets) + assert isinstance(self.__last_timestamp, datetime) + return Value( + payload=self.__accumulated_value, + committable=self.__offsets, + timestamp=self.__last_timestamp, + ) class Reduce( diff --git a/arroyo/processing/strategies/unfold.py b/arroyo/processing/strategies/unfold.py index b511e83b..73a24f68 100644 --- a/arroyo/processing/strategies/unfold.py +++ b/arroyo/processing/strategies/unfold.py @@ -55,9 +55,11 @@ def submit(self, message: Message[Union[FilteredPayload, TInput]]) -> None: for idx, value in enumerate(iterable): # Last message is special because offsets can be committed along with it if idx == num_messages - 1: - next_message = Message(Value(value, message.committable)) + next_message = Message( + Value(value, message.committable, message.timestamp) + ) else: - next_message = Message(Value(value, {})) + next_message = Message(Value(value, {}, message.timestamp)) if store_remaining_messages == False: try: diff --git a/arroyo/types.py b/arroyo/types.py index bcc7bc42..2f1c4dc6 100644 --- a/arroyo/types.py +++ b/arroyo/types.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from datetime import datetime -from typing import Any, Generic, Mapping, Protocol, TypeVar +from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar TReplaced = TypeVar("TReplaced") @@ -89,6 +89,10 @@ def payload_unfiltered(self) -> TMessagePayload: def committable(self) -> Mapping[Partition, int]: return self.value.committable + @property + def timestamp(self) -> Optional[datetime]: + return self.value.timestamp + def replace(self, payload: TReplaced) -> Message[TReplaced]: return Message(self.value.replace(payload)) @@ -102,6 +106,10 @@ def payload(self) -> TMessagePayload: def committable(self) -> Mapping[Partition, int]: raise NotImplementedError() + @property + def timestamp(self) -> Optional[datetime]: + raise NotImplementedError() + def replace(self, value: TReplaced) -> BaseValue[TReplaced]: raise NotImplementedError @@ -116,12 +124,17 @@ class Value(BaseValue[TMessagePayload]): __slots__ = ["__payload", "__committable"] __payload: TMessagePayload __committable: Mapping[Partition, int] + __timestamp: Optional[datetime] def __init__( - self, payload: TMessagePayload, committable: Mapping[Partition, int] + self, + payload: TMessagePayload, + committable: Mapping[Partition, int], + timestamp: Optional[datetime] = None, ) -> None: self.__payload = payload self.__committable = committable + self.__timestamp = timestamp @property def payload(self) -> TMessagePayload: @@ -131,8 +144,12 @@ def payload(self) -> TMessagePayload: def committable(self) -> Mapping[Partition, int]: return self.__committable + @property + def timestamp(self) -> Optional[datetime]: + return self.__timestamp + def replace(self, value: TReplaced) -> BaseValue[TReplaced]: - return Value(value, self.__committable) + return Value(value, self.__committable, self.timestamp) @dataclass(unsafe_hash=True) diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index ac4d587d..eaff217b 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -62,6 +62,8 @@ # Time (unitless) spent in shutting down the consumer. This metric's # timings overlap other timings, and might spike at the same time. "arroyo.consumer.shutdown.time", + # Consumer latency in seconds. Recorded by the commit offsets strategy. + "arroyo.consumer.latency", # Queue size of background queue that librdkafka uses to prefetch messages. "arroyo.consumer.librdkafka.total_queue_size", # Counter metric to measure how often the healthcheck file has been touched. diff --git a/examples/transform_and_produce/batched.py b/examples/transform_and_produce/batched.py index ef818b99..4a178c0d 100644 --- a/examples/transform_and_produce/batched.py +++ b/examples/transform_and_produce/batched.py @@ -41,6 +41,7 @@ def index_data( value=json.dumps(indexed_messages[i]).encode(), ), committable=batch.payload[i].committable, + timestamp=batch.timestamp, ) ) return ret @@ -71,7 +72,6 @@ def create_with_partitions( commit: Commit, partitions: Mapping[Partition, int], ) -> ProcessingStrategy[KafkaPayload]: - unbatch: UnbatchStep[KafkaPayload] = UnbatchStep( next_step=Produce(self.__producer, self.__topic, CommitOffsets(commit)) ) diff --git a/tests/processing/strategies/test_all.py b/tests/processing/strategies/test_all.py index 724e086a..328deb0b 100644 --- a/tests/processing/strategies/test_all.py +++ b/tests/processing/strategies/test_all.py @@ -139,9 +139,9 @@ def test_function(message: Message[bool]) -> bool: return message.payload messages: Sequence[DummyMessage] = [ - Message(Value(True, {Partition(Topic("topic"), 0): 1})), + Message(Value(True, {Partition(Topic("topic"), 0): 1}, NOW)), Message(Value(FILTERED_PAYLOAD, {Partition(Topic("topic"), 0): 2})), - Message(Value(True, {Partition(Topic("topic"), 0): 3})), + Message(Value(True, {Partition(Topic("topic"), 0): 3}, NOW)), ] for message in messages: diff --git a/tests/processing/strategies/test_batching.py b/tests/processing/strategies/test_batching.py index 5f920b18..f6841976 100644 --- a/tests/processing/strategies/test_batching.py +++ b/tests/processing/strategies/test_batching.py @@ -11,18 +11,24 @@ from arroyo.processing.strategies.run_task import RunTask from arroyo.types import BaseValue, BrokerValue, Message, Partition, Topic, Value +NOW = datetime(2022, 1, 1, 0, 0, 1) + def broker_value(partition: int, offset: int, payload: str) -> BrokerValue[str]: return BrokerValue( partition=Partition(topic=Topic("test"), index=partition), offset=offset, payload=payload, - timestamp=datetime(2022, 1, 1, 0, 0, 1), + timestamp=NOW, ) def value(committable: Mapping[Partition, int], payload: str) -> Value[str]: - return Value(payload=payload, committable=committable) + return Value( + payload=payload, + committable=committable, + timestamp=NOW, + ) test_builder = [ @@ -174,6 +180,7 @@ def message(partition: int, offset: int, payload: str) -> Message[str]: broker_value(0, 3, "Message 3"), ], committable={Partition(Topic("test"), 0): 4}, + timestamp=datetime(2022, 1, 1, 0, 0, 1), ), ) ) @@ -200,6 +207,7 @@ def message(partition: int, offset: int, payload: str) -> Message[str]: broker_value(0, 3, "Message 3"), ], committable={Partition(Topic("test"), 0): 4}, + timestamp=NOW, ), ) ), @@ -212,6 +220,7 @@ def message(partition: int, offset: int, payload: str) -> Message[str]: broker_value(1, 3, "Message 3"), ], committable={Partition(Topic("test"), 1): 4}, + timestamp=NOW, ), ) ), @@ -267,6 +276,7 @@ def test_batch_join() -> None: broker_value(0, 2, "Message 2"), ], committable={Partition(Topic("test"), 0): 3}, + timestamp=NOW, ), ) ) @@ -286,6 +296,7 @@ def test_unbatch_step() -> None: ], ), committable={Partition(Topic("test"), 1): 4}, + timestamp=NOW, ), ) @@ -296,9 +307,9 @@ def test_unbatch_step() -> None: unbatch_step.submit(msg) next_step.submit.assert_has_calls( [ - call(Message(Value("Message 1", {}))), - call(Message(Value("Message 2", {}))), - call(Message(Value("Message 3", {partition: 4}))), + call(Message(Value("Message 1", {}, NOW))), + call(Message(Value("Message 2", {}, NOW))), + call(Message(Value("Message 3", {partition: 4}, NOW))), ] ) @@ -317,9 +328,9 @@ def test_unbatch_step() -> None: unbatch_step.poll() next_step.submit.assert_has_calls( [ - call(Message(Value("Message 1", {}))), - call(Message(Value("Message 2", {}))), - call(Message(Value("Message 3", {partition: 4}))), + call(Message(Value("Message 1", {}, NOW))), + call(Message(Value("Message 2", {}, NOW))), + call(Message(Value("Message 3", {partition: 4}, NOW))), ] ) @@ -330,9 +341,9 @@ def test_unbatch_step() -> None: next_step.submit.assert_has_calls( [ - call(Message(Value("Message 1", {}))), - call(Message(Value("Message 2", {}))), - call(Message(Value("Message 3", {partition: 4}))), + call(Message(Value("Message 1", {}, NOW))), + call(Message(Value("Message 2", {}, NOW))), + call(Message(Value("Message 3", {partition: 4}, NOW))), ] ) @@ -368,8 +379,8 @@ def transformer( final_step.submit.assert_has_calls( [ - call(Message(Value("Transformed", {}))), - call(Message(Value("Transformed", {}))), - call(Message(Value("Transformed", {partition: 4}))), + call(Message(Value("Transformed", {}, NOW))), + call(Message(Value("Transformed", {}, NOW))), + call(Message(Value("Transformed", {partition: 4}, NOW))), ] ) diff --git a/tests/processing/strategies/test_commit.py b/tests/processing/strategies/test_commit.py index c857db57..13769317 100644 --- a/tests/processing/strategies/test_commit.py +++ b/tests/processing/strategies/test_commit.py @@ -1,14 +1,18 @@ -from unittest.mock import Mock +from datetime import datetime +from unittest.mock import ANY, Mock from arroyo.processing.strategies.commit import CommitOffsets from arroyo.types import Message, Partition, Topic, Value +from tests.metrics import Increment, TestingMetricsBackend def test_commit() -> None: commit_func = Mock() strategy = CommitOffsets(commit_func) - strategy.submit(Message(Value(b"", {Partition(Topic("topic"), 1): 5}))) + strategy.submit( + Message(Value(b"", {Partition(Topic("topic"), 1): 5}, datetime.now())) + ) assert commit_func.call_count == 1 @@ -23,3 +27,16 @@ def test_commit_poll() -> None: assert commit_func.call_count == 1 + +def test_record_metric() -> None: + commit_func = Mock() + strategy = CommitOffsets(commit_func) + now = datetime.now() + + strategy.submit(Message(Value(b"", {Partition(Topic("topic"), 1): 5}, now))) + strategy.poll() + + metrics = TestingMetricsBackend + assert metrics.calls == [ + Increment(name="arroyo.consumer.latency", value=ANY, tags=None) + ] diff --git a/tests/processing/strategies/test_filter.py b/tests/processing/strategies/test_filter.py index ccbcbf64..3fe21e3f 100644 --- a/tests/processing/strategies/test_filter.py +++ b/tests/processing/strategies/test_filter.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Union from unittest.mock import Mock, call @@ -23,12 +24,14 @@ def test_function(message: Message[bool]) -> bool: filter_step = FilterStep(test_function, next_step) - fail_message = Message(Value(False, {Partition(Topic("topic"), 0): 1})) + now = datetime.now() + + fail_message = Message(Value(False, {Partition(Topic("topic"), 0): 1}, now)) with assert_does_not_change(lambda: int(next_step.submit.call_count), 0): filter_step.submit(fail_message) - pass_message = Message(Value(True, {Partition(Topic("topic"), 0): 1})) + pass_message = Message(Value(True, {Partition(Topic("topic"), 0): 1}, now)) with assert_changes(lambda: int(next_step.submit.call_count), 0, 1): filter_step.submit(pass_message) @@ -55,18 +58,20 @@ def test_function(message: Message[bool]) -> bool: test_function, next_step, commit_policy=CommitPolicy(None, 3) ) - init_message = Message(Value(False, {Partition(topic, 1): 1})) + now = datetime.now() + + init_message = Message(Value(False, {Partition(topic, 1): 1}, now)) filter_step.submit(init_message) assert next_step.submit.call_count == 0 for i in range(2): - fail_message = Message(Value(False, {Partition(topic, 0): i})) + fail_message = Message(Value(False, {Partition(topic, 0): i}, now)) filter_step.submit(fail_message) assert next_step.submit.call_count == 0 - fail_message = Message(Value(False, {Partition(topic, 0): 2})) + fail_message = Message(Value(False, {Partition(topic, 0): 2}, now)) filter_step.submit(fail_message) # Assert that the filter message kept track of the new offsets across @@ -74,7 +79,10 @@ def test_function(message: Message[bool]) -> bool: # and according to our commit policy we are supposed to commit at this # point, roughly. expected_filter_message: Message[Union[FilteredPayload, bool]] = Message( - Value(FILTERED_PAYLOAD, {Partition(topic, 1): 1, Partition(topic, 0): 2}) + Value( + FILTERED_PAYLOAD, + {Partition(topic, 1): 1, Partition(topic, 0): 2}, + ) ) assert next_step.submit.mock_calls == [call(expected_filter_message)] @@ -84,7 +92,7 @@ def test_function(message: Message[bool]) -> bool: filter_step.join() assert next_step.submit.call_count == 0 - fail_message = Message(Value(False, {Partition(topic, 0): 3})) + fail_message = Message(Value(False, {Partition(topic, 0): 3}, now)) filter_step.submit(fail_message) assert next_step.submit.call_count == 0 assert next_step.join.call_count == 1 @@ -105,6 +113,7 @@ def test_function(message: Message[bool]) -> bool: def test_commit_policy_filtered_messages_alternating() -> None: topic = Topic("topic") next_step = Mock() + now = datetime.now() def test_function(message: Message[bool]) -> bool: return message.payload @@ -113,18 +122,18 @@ def test_function(message: Message[bool]) -> bool: test_function, next_step, commit_policy=CommitPolicy(None, 3) ) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}))) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 3}))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 4}))) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 5}))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 6}))) + filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now))) + filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}, now))) + filter_step.submit(Message(Value(True, {Partition(topic, 1): 3}, now))) + filter_step.submit(Message(Value(False, {Partition(topic, 1): 4}, now))) + filter_step.submit(Message(Value(True, {Partition(topic, 1): 5}, now))) + filter_step.submit(Message(Value(False, {Partition(topic, 1): 6}, now))) assert next_step.submit.mock_calls == [ - call(Message(Value(True, {Partition(topic, 1): 1}))), - call(Message(Value(True, {Partition(topic, 1): 3}))), + call(Message(Value(True, {Partition(topic, 1): 1}, now))), + call(Message(Value(True, {Partition(topic, 1): 3}, now))), call(Message(Value(FILTERED_PAYLOAD, {Partition(topic, 1): 4}))), - call(Message(Value(True, {Partition(topic, 1): 5}))), + call(Message(Value(True, {Partition(topic, 1): 5}, now))), ] @@ -132,20 +141,22 @@ def test_no_commit_policy_does_not_forward_filtered_messages() -> None: topic = Topic("topic") next_step = Mock() + now = datetime.now() + def test_function(message: Message[bool]) -> bool: return message.payload filter_step = FilterStep(test_function, next_step) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}))) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 3}))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 4}))) - filter_step.submit(Message(Value(True, {Partition(topic, 1): 5}))) - filter_step.submit(Message(Value(False, {Partition(topic, 1): 6}))) + filter_step.submit(Message(Value(True, {Partition(topic, 1): 1}, now))) + filter_step.submit(Message(Value(False, {Partition(topic, 1): 2}, now))) + filter_step.submit(Message(Value(True, {Partition(topic, 1): 3}, now))) + filter_step.submit(Message(Value(False, {Partition(topic, 1): 4}, now))) + filter_step.submit(Message(Value(True, {Partition(topic, 1): 5}, now))) + filter_step.submit(Message(Value(False, {Partition(topic, 1): 6}, now))) assert next_step.submit.mock_calls == [ - call(Message(Value(True, {Partition(topic, 1): 1}))), - call(Message(Value(True, {Partition(topic, 1): 3}))), - call(Message(Value(True, {Partition(topic, 1): 5}))), + call(Message(Value(True, {Partition(topic, 1): 1}, now))), + call(Message(Value(True, {Partition(topic, 1): 3}, now))), + call(Message(Value(True, {Partition(topic, 1): 5}, now))), ] diff --git a/tests/processing/strategies/test_guard.py b/tests/processing/strategies/test_guard.py index 28122031..94ea783f 100644 --- a/tests/processing/strategies/test_guard.py +++ b/tests/processing/strategies/test_guard.py @@ -19,7 +19,8 @@ def test_guard() -> None: partition = Partition(Topic("topic"), 1) - message = Message(BrokerValue(b"", partition, 5, datetime.now())) + now = datetime.now() + message = Message(BrokerValue(b"", partition, 5, now)) # Reject all messages that aren't the filtered one def inner_strategy_submit(msg: Message[bytes]) -> None: diff --git a/tests/processing/strategies/test_produce.py b/tests/processing/strategies/test_produce.py index 27959ce9..a6a8bcaa 100644 --- a/tests/processing/strategies/test_produce.py +++ b/tests/processing/strategies/test_produce.py @@ -1,3 +1,4 @@ +from datetime import datetime from unittest import mock import pytest @@ -26,8 +27,9 @@ def test_produce() -> None: value = b'{"something": "something"}' data = KafkaPayload(None, value, []) + now = datetime.now() - message = Message(Value(data, {Partition(orig_topic, 0): 1})) + message = Message(Value(data, {Partition(orig_topic, 0): 1}, now)) strategy.submit(message) diff --git a/tests/processing/strategies/test_reduce.py b/tests/processing/strategies/test_reduce.py index daa8d974..00d35637 100644 --- a/tests/processing/strategies/test_reduce.py +++ b/tests/processing/strategies/test_reduce.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Callable, Set from unittest.mock import Mock, call @@ -6,6 +7,8 @@ def test_reduce() -> None: + now = datetime.now() + def accumulator(result: Set[int], value: BaseValue[int]) -> Set[int]: result.add(value.payload) return result @@ -26,6 +29,7 @@ def accumulator(result: Set[int], value: BaseValue[int]) -> Set[int]: { partition: i + 1, }, + now, ) ) ) @@ -33,7 +37,7 @@ def accumulator(result: Set[int], value: BaseValue[int]) -> Set[int]: next_step.submit.assert_has_calls( [ - call(Message(Value({0, 1, 2}, {partition: 3}))), - call(Message(Value({3, 4, 5}, {partition: 6}))), + call(Message(Value({0, 1, 2}, {partition: 3}, now))), + call(Message(Value({3, 4, 5}, {partition: 6}, now))), ] ) diff --git a/tests/processing/strategies/test_run_task.py b/tests/processing/strategies/test_run_task.py index 572f75f1..5ad3e76f 100644 --- a/tests/processing/strategies/test_run_task.py +++ b/tests/processing/strategies/test_run_task.py @@ -10,13 +10,14 @@ def test_run_task() -> None: mock_func = Mock() next_step = Mock() + now = datetime.now() strategy = RunTask(mock_func, next_step) partition = Partition(Topic("topic"), 0) - strategy.submit(Message(Value(b"hello", {partition: 1}))) + strategy.submit(Message(Value(b"hello", {partition: 1}, now))) strategy.poll() - strategy.submit(Message(Value(b"world", {partition: 2}))) + strategy.submit(Message(Value(b"world", {partition: 2}, now))) strategy.poll() # Wait for async functions to finish @@ -43,13 +44,14 @@ def test_run_task() -> None: def test_transform() -> None: next_step = Mock() + now = datetime.now() def transform_function(value: Message[int]) -> int: return value.payload * 2 transform_step = RunTask(transform_function, next_step) - original_message = Message(Value(1, {Partition(Topic("topic"), 0): 1})) + original_message = Message(Value(1, {Partition(Topic("topic"), 0): 1}, now)) with assert_changes(lambda: int(next_step.submit.call_count), 0, 1): transform_step.submit(original_message) @@ -59,6 +61,7 @@ def transform_function(value: Message[int]) -> int: Value( transform_function(original_message), original_message.committable, + original_message.timestamp, ) ) ) diff --git a/tests/processing/strategies/test_run_task_with_multiprocessing.py b/tests/processing/strategies/test_run_task_with_multiprocessing.py index 180d673f..1aa8bd4f 100644 --- a/tests/processing/strategies/test_run_task_with_multiprocessing.py +++ b/tests/processing/strategies/test_run_task_with_multiprocessing.py @@ -1,5 +1,6 @@ import multiprocessing import time +from datetime import datetime from multiprocessing.managers import SharedMemoryManager from typing import Any from unittest.mock import Mock, call @@ -33,10 +34,7 @@ def test_message_batch() -> None: assert block.size == 16384 message = Message( - Value( - KafkaPayload(None, b"\x00" * 16000, []), - {partition: 1}, - ) + Value(KafkaPayload(None, b"\x00" * 16000, []), {partition: 1}, datetime.now()) ) batch: MessageBatch[Message[KafkaPayload]] = MessageBatch(block) @@ -66,6 +64,7 @@ def test_parallel_run_task_worker_apply() -> None: Value( KafkaPayload(None, b"\x00" * size, []), {Partition(Topic("test"), 0): i + 1}, + datetime.now(), ) ) for i, size in enumerate([4000, 4000, 8000, 12000]) @@ -132,6 +131,7 @@ def test_parallel_transform_step() -> None: Value( KafkaPayload(None, b"\x00" * size, []), {Partition(Topic("test"), 0): i + 1}, + datetime.now(), ) ) for i, size in enumerate([4000, 4000, 8000, 2000]) @@ -330,6 +330,8 @@ def test_message_rejected_multiple() -> None: next_step = Mock() next_step.submit.side_effect = MessageRejected() + now = datetime.now() + strategy = RunTaskWithMultiprocessing( count_calls, next_step, @@ -340,8 +342,8 @@ def test_message_rejected_multiple() -> None: output_block_size=4096, ) - strategy.submit(Message(Value(1, {}))) - strategy.submit(Message(Value(-100, {}))) + strategy.submit(Message(Value(1, {}, now))) + strategy.submit(Message(Value(-100, {}, now))) start_time = time.time() @@ -355,11 +357,11 @@ def test_message_rejected_multiple() -> None: # The strategy keeps trying to submit the same message # since it's continually rejected assert next_step.submit.call_args_list == [ - call(Message(Value(2, {}))), - call(Message(Value(2, {}))), - call(Message(Value(2, {}))), - call(Message(Value(2, {}))), - call(Message(Value(2, {}))), + call(Message(Value(2, {}, now))), + call(Message(Value(2, {}, now))), + call(Message(Value(2, {}, now))), + call(Message(Value(2, {}, now))), + call(Message(Value(2, {}, now))), ] # clear the side effect, let the message through now @@ -375,15 +377,15 @@ def test_message_rejected_multiple() -> None: # The messages should have been submitted successfully now assert next_step.submit.call_args_list == [ - call(Message(Value(2, {}))), - call(Message(Value(-98, {}))), + call(Message(Value(2, {}, now))), + call(Message(Value(-98, {}, now))), ] strategy.close() strategy.join() assert next_step.submit.call_args_list == [ - call(Message(Value(2, {}))), - call(Message(Value(-98, {}))), + call(Message(Value(2, {}, now))), + call(Message(Value(-98, {}, now))), ] assert TestingMetricsBackend.calls == [ @@ -467,7 +469,7 @@ def test_regression_join_timeout_one_message() -> None: ) strategy.poll() - strategy.submit(Message(Value(10, {}))) + strategy.submit(Message(Value(10, {}, datetime.now()))) start = time.time() @@ -496,7 +498,7 @@ def test_regression_join_timeout_many_messages() -> None: ) for _ in range(10): - strategy.submit(Message(Value(0.1, {}))) + strategy.submit(Message(Value(0.1, {}, datetime.now()))) start = time.time() @@ -612,6 +614,8 @@ def test_output_block_resizing_without_limits() -> None: NUM_MESSAGES = INPUT_SIZE // MSG_SIZE next_step = Mock() + now = datetime.now() + strategy = RunTaskWithMultiprocessing( run_multiply_times_two, next_step, @@ -623,7 +627,9 @@ def test_output_block_resizing_without_limits() -> None: ) for _ in range(NUM_MESSAGES): - strategy.submit(Message(Value(KafkaPayload(None, b"x" * MSG_SIZE, []), {}))) + strategy.submit( + Message(Value(KafkaPayload(None, b"x" * MSG_SIZE, []), {}, now)) + ) strategy.close() strategy.join(timeout=3) @@ -631,7 +637,7 @@ def test_output_block_resizing_without_limits() -> None: assert ( next_step.submit.call_args_list == [ - call(Message(Value(KafkaPayload(None, b"x" * 2 * MSG_SIZE, []), {}))), + call(Message(Value(KafkaPayload(None, b"x" * 2 * MSG_SIZE, []), {}, now))), ] * NUM_MESSAGES ) @@ -647,7 +653,10 @@ def test_output_block_resizing_without_limits() -> None: def message_processor_raising_invalid_message(x: Message[KafkaPayload]) -> KafkaPayload: - raise InvalidMessage(Partition(topic=Topic("test_topic"), index=0), offset=1000) + raise InvalidMessage( + Partition(topic=Topic("test_topic"), index=0), + offset=1000, + ) def test_multiprocessing_with_invalid_message() -> None: @@ -661,7 +670,9 @@ def test_multiprocessing_with_invalid_message() -> None: max_batch_time=60, ) - strategy.submit(Message(Value(KafkaPayload(None, b"x" * 10, []), {}))) + strategy.submit( + Message(Value(KafkaPayload(None, b"x" * 10, []), {}, datetime.now())) + ) strategy.poll() strategy.close() @@ -673,6 +684,7 @@ def test_reraise_invalid_message() -> None: next_step = Mock() partition = Partition(Topic("test"), 0) offset = 5 + now = datetime.now() next_step.poll.side_effect = InvalidMessage(partition, offset) strategy = RunTaskWithMultiprocessing( @@ -683,7 +695,7 @@ def test_reraise_invalid_message() -> None: max_batch_time=60, ) - strategy.submit(Message(Value(KafkaPayload(None, b"x" * 10, []), {}))) + strategy.submit(Message(Value(KafkaPayload(None, b"x" * 10, []), {}, now))) with pytest.raises(InvalidMessage): strategy.poll() diff --git a/tests/processing/strategies/test_unfold.py b/tests/processing/strategies/test_unfold.py index 76c89bcd..63dcfea8 100644 --- a/tests/processing/strategies/test_unfold.py +++ b/tests/processing/strategies/test_unfold.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Sequence from unittest.mock import Mock, call @@ -12,16 +13,17 @@ def generator(num: int) -> Sequence[int]: def test_unfold() -> None: partition = Partition(Topic("topic"), 0) + now = datetime.now() - message = Message(Value(2, {partition: 1})) + message = Message(Value(2, {partition: 1}, now)) next_step = Mock() strategy = Unfold(generator, next_step) strategy.submit(message) assert next_step.submit.call_args_list == [ - call(Message(Value(0, committable={}))), - call(Message(Value(1, committable={partition: 1}))), + call(Message(Value(0, committable={}, timestamp=now))), + call(Message(Value(1, committable={partition: 1}, timestamp=now))), ] strategy.close() @@ -30,20 +32,20 @@ def test_unfold() -> None: def test_message_rejected() -> None: partition = Partition(Topic("topic"), 0) - + now = datetime.now() next_step = Mock() next_step.submit.side_effect = MessageRejected() strategy = Unfold(generator, next_step) - message = Message(Value(2, {partition: 1})) + message = Message(Value(2, {partition: 1}, now)) strategy.submit(message) assert next_step.submit.call_count == 1 # Message doesn't actually go through since it was rejected assert next_step.submit.call_args_list == [ - call(Message(Value(0, committable={}))), + call(Message(Value(0, committable={}, timestamp=now))), ] # clear the side effect, both messages should be submitted now @@ -52,8 +54,8 @@ def test_message_rejected() -> None: strategy.poll() assert next_step.submit.call_args_list == [ - call(Message(Value(0, committable={}))), - call(Message(Value(1, committable={partition: 1}))), + call(Message(Value(0, committable={}, timestamp=now))), + call(Message(Value(1, committable={partition: 1}, timestamp=now))), ] strategy.close() diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 76698362..d9a7a853 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -184,7 +184,10 @@ def test_stream_processor_invalid_message_from_poll() -> None: consumer.poll.side_effect = [BrokerValue(0, partition, offset, now)] strategy = mock.Mock() - strategy.poll.side_effect = [InvalidMessage(partition, 0, needs_commit=False), None] + strategy.poll.side_effect = [ + InvalidMessage(partition, 0, needs_commit=False), + None, + ] factory = mock.Mock() factory.create_with_partitions.return_value = strategy @@ -562,7 +565,8 @@ def test_healthcheck(tmpdir: py.path.local) -> None: topic = Topic("topic") partition = Partition(topic, 0) consumer = mock.Mock() - consumer.poll.return_value = BrokerValue(0, partition, 1, datetime.now()) + now = datetime.now() + consumer.poll.return_value = BrokerValue(0, partition, 1, now) strategy = mock.Mock() strategy.submit.side_effect = InvalidMessage(partition, 1) factory = mock.Mock() diff --git a/tests/test_types.py b/tests/test_types.py index 966387be..0b9f75f4 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -19,7 +19,7 @@ def test_message() -> None: assert pickle.loads(pickle.dumps(broker_message)) == broker_message # Generic payload - message = Message(Value(b"", {partition: 1})) + message = Message(Value(b"", {partition: 1}, datetime.now())) assert pickle.loads(pickle.dumps(message)) == message # Replace payload