From 9a8f65ad1256e389c5a29f9ccc102be0615059a9 Mon Sep 17 00:00:00 2001 From: Reid Meyer Date: Thu, 10 Aug 2023 08:55:21 +0200 Subject: [PATCH] FIX: remove stop from remove stream --- kstreams/__init__.py | 5 - kstreams/create.py | 5 - kstreams/engine.py | 23 +- kstreams/prometheus/monitor.py | 239 ----------------- kstreams/rebalance_listener.py | 8 +- kstreams/test_utils/__init__.py | 2 - kstreams/test_utils/structs.py | 8 - kstreams/test_utils/test_clients.py | 231 ----------------- kstreams/test_utils/test_utils.py | 124 --------- kstreams/test_utils/topics.py | 139 ---------- tests/test_client.py | 387 ---------------------------- tests/test_consumer.py | 162 ------------ tests/test_monitor.py | 227 ---------------- 13 files changed, 14 insertions(+), 1546 deletions(-) delete mode 100644 kstreams/test_utils/__init__.py delete mode 100644 kstreams/test_utils/structs.py delete mode 100644 kstreams/test_utils/test_clients.py delete mode 100644 kstreams/test_utils/test_utils.py delete mode 100644 kstreams/test_utils/topics.py diff --git a/kstreams/__init__.py b/kstreams/__init__.py index 6715cd4..45386c1 100644 --- a/kstreams/__init__.py +++ b/kstreams/__init__.py @@ -2,7 +2,6 @@ from .clients import Consumer, ConsumerType, Producer, ProducerType from .create import StreamEngine, create_engine -from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType from .rebalance_listener import ( ManualCommitRebalanceListener, MetricsRebalanceListener, @@ -10,7 +9,6 @@ ) from .streams import Stream, stream from .structs import TopicPartitionOffset -from .test_utils import TestStreamClient __all__ = [ "Consumer", @@ -19,15 +17,12 @@ "ProducerType", "StreamEngine", "create_engine", - "PrometheusMonitor", - "PrometheusMonitorType", "MetricsRebalanceListener", "ManualCommitRebalanceListener", "RebalanceListener", "Stream", "stream", "ConsumerRecord", - "TestStreamClient", "TopicPartition", "TopicPartitionOffset", ] diff --git a/kstreams/create.py b/kstreams/create.py index f88eeaa..d43d883 100644 --- a/kstreams/create.py +++ b/kstreams/create.py @@ -3,7 +3,6 @@ from .backends.kafka import Kafka from .clients import Consumer, ConsumerType, Producer, ProducerType from .engine import StreamEngine -from .prometheus.monitor import PrometheusMonitor from .serializers import Deserializer, Serializer @@ -14,11 +13,8 @@ def create_engine( producer_class: Type[ProducerType] = Producer, serializer: Optional[Serializer] = None, deserializer: Optional[Deserializer] = None, - monitor: Optional[PrometheusMonitor] = None, ) -> StreamEngine: - if monitor is None: - monitor = PrometheusMonitor() if backend is None: backend = Kafka() @@ -30,5 +26,4 @@ def create_engine( producer_class=producer_class, serializer=serializer, deserializer=deserializer, - monitor=monitor, ) diff --git a/kstreams/engine.py b/kstreams/engine.py index 1b44333..8c71938 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -9,7 +9,6 @@ from .backends.kafka import Kafka from .clients import ConsumerType, ProducerType from .exceptions import DuplicateStreamException, EngineNotStartedException -from .prometheus.monitor import PrometheusMonitor from .rebalance_listener import MetricsRebalanceListener, RebalanceListener from .serializers import Deserializer, Serializer from .streams import Stream, StreamFunc, stream @@ -61,7 +60,6 @@ def __init__( backend: Kafka, consumer_class: Type[ConsumerType], producer_class: Type[ProducerType], - monitor: PrometheusMonitor, title: Optional[str] = None, deserializer: Optional[Deserializer] = None, serializer: Optional[Serializer] = None, @@ -72,7 +70,6 @@ def __init__( self.producer_class = producer_class self.deserializer = deserializer self.serializer = serializer - self.monitor = monitor self._producer: Optional[Type[ProducerType]] = None self._streams: List[Stream] = [] @@ -123,9 +120,9 @@ async def send( headers=encoded_headers, ) metadata: RecordMetadata = await fut - self.monitor.add_topic_partition_offset( - topic, metadata.partition, metadata.offset - ) + # self.monitor.add_topic_partition_offset( + # topic, metadata.partition, metadata.offset + # ) return metadata @@ -133,13 +130,13 @@ async def start(self) -> None: await self.start_producer() await self.start_streams() - # add the producer and streams to the Monitor - self.monitor.add_producer(self._producer) - self.monitor.add_streams(self._streams) - self.monitor.start() + # # add the producer and streams to the Monitor + # self.monitor.add_producer(self._producer) + # self.monitor.add_streams(self._streams) + # self.monitor.start() async def stop(self) -> None: - await self.monitor.stop() + # await self.monitor.stop() await self.stop_producer() await self.stop_streams() @@ -203,8 +200,8 @@ def add_stream(self, stream: Stream) -> None: async def remove_stream(self, stream: Stream) -> None: self._streams.remove(stream) - await stream.stop() - self.monitor.clean_stream_consumer_metrics(stream) + del stream + # self.monitor.clean_stream_consumer_metrics(stream) def stream( self, diff --git a/kstreams/prometheus/monitor.py b/kstreams/prometheus/monitor.py index 0ac1582..e69de29 100644 --- a/kstreams/prometheus/monitor.py +++ b/kstreams/prometheus/monitor.py @@ -1,239 +0,0 @@ -import asyncio -import logging -from typing import DefaultDict, Dict, List, Optional, TypeVar - -from prometheus_client import Gauge - -from kstreams import TopicPartition -from kstreams.clients import ConsumerType -from kstreams.streams import Stream - -logger = logging.getLogger(__name__) - -PrometheusMonitorType = TypeVar("PrometheusMonitorType", bound="PrometheusMonitor") -MetricsType = Dict[TopicPartition, Dict[str, Optional[int]]] - - -class PrometheusMonitor: - """ - Metrics monitor to keep track of Producers and Consumers. - - Attributes: - metrics_scrape_time float: Amount of seconds that the monitor - will wait until next scrape iteration - """ - - # Producer metrics - MET_OFFSETS = Gauge( - "topic_partition_offsets", "help producer offsets", ["topic", "partition"] - ) - - # Consumer metrics - MET_COMMITTED = Gauge( - "consumer_committed", - "help consumer committed", - ["topic", "partition", "consumer_group"], - ) - MET_POSITION = Gauge( - "consumer_position", - "help consumer position", - ["topic", "partition", "consumer_group"], - ) - MET_HIGHWATER = Gauge( - "consumer_highwater", - "help consumer highwater", - ["topic", "partition", "consumer_group"], - ) - MET_LAG = Gauge( - "consumer_lag", - "help consumer lag calculated using the last commited offset", - ["topic", "partition", "consumer_group"], - ) - MET_POSITION_LAG = Gauge( - "position_lag", - "help consumer position lag calculated using the consumer position", - ["topic", "partition", "consumer_group"], - ) - - def __init__(self, metrics_scrape_time: float = 3): - self.metrics_scrape_time = metrics_scrape_time - self.running = False - self._producer = None - self._streams: List[Stream] = [] - self._task: Optional[asyncio.Task] = None - - def start(self) -> None: - logger.info("Starting Prometheus metrics...") - self.running = True - self._task = asyncio.create_task(self._metrics_task()) - - async def stop(self) -> None: - logger.info("Stoping Prometheus metrics...") - self.running = False - - if self._task is not None: - # we need to make sure that the task is `done` - # to clean up properly - while not self._task.done(): - await asyncio.sleep(0.1) - - self._clean_consumer_metrics() - - def add_topic_partition_offset( - self, topic: str, partition: int, offset: int - ) -> None: - self.MET_OFFSETS.labels(topic=topic, partition=partition).set(offset) - - def _add_consumer_metrics(self, metrics_dict: MetricsType): - for topic_partition, partitions_metadata in metrics_dict.items(): - group_id = partitions_metadata["group_id"] - position = partitions_metadata["position"] - committed = partitions_metadata["committed"] - highwater = partitions_metadata["highwater"] - lag = partitions_metadata["lag"] - position_lag = partitions_metadata["position_lag"] - - self.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=group_id, - ).set(committed or 0) - self.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=group_id, - ).set(position or -1) - self.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=group_id, - ).set(highwater or 0) - self.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=group_id, - ).set(lag or 0) - self.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=group_id, - ).set(position_lag or 0) - - def _clean_consumer_metrics(self) -> None: - """ - This method should be called when a rebalance takes place - to clean all consumers metrics. When the rebalance finishes - new metrics will be generated per consumer based on the - consumer assigments - """ - self.MET_LAG.clear() - self.MET_POSITION_LAG.clear() - self.MET_COMMITTED.clear() - self.MET_POSITION.clear() - self.MET_HIGHWATER.clear() - - def clean_stream_consumer_metrics(self, stream: Stream) -> None: - if stream.consumer is not None: - topic_partitions = stream.consumer.assignment() - group_id = stream.consumer._group_id - for topic_partition in topic_partitions: - topic = topic_partition.topic - partition = topic_partition.partition - - metrics_found = False - for sample in self.MET_LAG.collect()[0].samples: - if { - "topic": topic, - "partition": str(partition), - "consumer_group": group_id, - } == sample.labels: - metrics_found = True - - if metrics_found: - self.MET_LAG.remove(topic, partition, group_id) - self.MET_POSITION_LAG.remove(topic, partition, group_id) - self.MET_COMMITTED.remove(topic, partition, group_id) - self.MET_POSITION.remove(topic, partition, group_id) - self.MET_HIGHWATER.remove(topic, partition, group_id) - else: - logger.debug(f"Metrics for stream: {stream.name} not found") - - def add_producer(self, producer): - self._producer = producer - - def add_streams(self, streams): - self._streams = streams - - async def generate_consumer_metrics(self, consumer: ConsumerType): - """ - Generate Consumer Metrics for Prometheus - - Format: - { - "topic-1": { - "1": ( - [topic-1, partition-number, 'group-id-1'], - committed, position, highwater, lag, position_lag - ) - "2": ( - [topic-1, partition-number, 'group-id-1'], - committed, position, highwater, lag, position_lag - ) - }, - ... - "topic-n": { - "1": ( - [topic-n, partition-number, 'group-id-n'], - committed, position, highwater, lag, position_lag - ) - "2": ( - [topic-n, partition-number, 'group-id-n'], - committed, position, highwater, lag, position_lag - ) - } - } - """ - metrics: MetricsType = DefaultDict(dict) - - topic_partitions = consumer.assignment() - - for topic_partition in topic_partitions: - committed = await consumer.committed(topic_partition) or 0 - position = await consumer.position(topic_partition) - highwater = consumer.highwater(topic_partition) - - lag = position_lag = None - if highwater: - lag = highwater - committed - position_lag = highwater - position - - metrics[topic_partition] = { - "group_id": consumer._group_id, - "committed": committed, - "position": position, - "highwater": highwater, - "lag": lag, - "position_lag": position_lag, - } - - self._add_consumer_metrics(metrics) - - async def _metrics_task(self) -> None: - """ - Asyncio Task that runs in `backgroud` to generate - consumer metrics. - - When self.running is False the task will finish and it - will be safe to stop consumers and producers. - """ - while self.running: - await asyncio.sleep(self.metrics_scrape_time) - for stream in self._streams: - if stream.consumer is not None: - try: - await self.generate_consumer_metrics(stream.consumer) - except RuntimeError: - logger.debug( - f"Metrics for stream {stream.name} can not be generated " - "probably because it has been removed" - ) diff --git a/kstreams/rebalance_listener.py b/kstreams/rebalance_listener.py index d358a81..578c5d0 100644 --- a/kstreams/rebalance_listener.py +++ b/kstreams/rebalance_listener.py @@ -110,9 +110,9 @@ async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None: to the consumer on the last rebalance """ # lock all asyncio Tasks so no new metrics will be added to the Monitor - if revoked and self.engine is not None: - async with asyncio.Lock(): - await self.engine.monitor.stop() + # if revoked and self.engine is not None: + # async with asyncio.Lock(): + # await self.engine.monitor.stop() async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None: """ @@ -128,7 +128,7 @@ async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None: # lock all asyncio Tasks so no new metrics will be added to the Monitor if assigned and self.engine is not None: async with asyncio.Lock(): - self.engine.monitor.start() + # self.engine.monitor.start() stream = self.stream if stream is not None: diff --git a/kstreams/test_utils/__init__.py b/kstreams/test_utils/__init__.py deleted file mode 100644 index 2eea3f0..0000000 --- a/kstreams/test_utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .test_clients import TestConsumer, TestProducer # noqa: F401 -from .test_utils import TestStreamClient, TopicManager # noqa: F401 diff --git a/kstreams/test_utils/structs.py b/kstreams/test_utils/structs.py deleted file mode 100644 index d37e3b9..0000000 --- a/kstreams/test_utils/structs.py +++ /dev/null @@ -1,8 +0,0 @@ -from typing import NamedTuple - - -class RecordMetadata(NamedTuple): - offset: int - partition: int - topic: str - timestamp: int diff --git a/kstreams/test_utils/test_clients.py b/kstreams/test_utils/test_clients.py deleted file mode 100644 index 4d89d6a..0000000 --- a/kstreams/test_utils/test_clients.py +++ /dev/null @@ -1,231 +0,0 @@ -from datetime import datetime -from typing import Any, Coroutine, Dict, List, Optional, Set, Tuple - -from kstreams import ConsumerRecord, RebalanceListener, TopicPartition -from kstreams.clients import Consumer, Producer -from kstreams.serializers import Serializer -from kstreams.types import Headers - -from .structs import RecordMetadata -from .topics import Topic, TopicManager - - -class Base: - async def start(self): - ... - - -class TestProducer(Base, Producer): - __test__ = False - - async def send( - self, - topic_name: str, - value: Any = None, - key: Any = None, - partition: int = 0, - timestamp_ms: Optional[float] = None, - headers: Optional[Headers] = None, - serializer: Optional[Serializer] = None, - serializer_kwargs: Optional[Dict] = None, - ) -> Coroutine: - topic, _ = TopicManager.get_or_create(topic_name) - timestamp_ms = timestamp_ms or datetime.now().timestamp() - total_partition_events = topic.offset(partition=partition) - partition = partition or 0 - - consumer_record = ConsumerRecord( - topic=topic_name, - value=value, - key=key, - headers=headers, - partition=partition, - timestamp=timestamp_ms, - offset=total_partition_events + 1, - timestamp_type=None, - checksum=None, - serialized_key_size=None, - serialized_value_size=None, - ) - - await topic.put(consumer_record) - - async def fut(): - return RecordMetadata( - topic=topic_name, - partition=partition, - timestamp=timestamp_ms, - offset=total_partition_events + 1, - ) - - return fut() - - -class TestConsumer(Base, Consumer): - __test__ = False - - def __init__(self, group_id: Optional[str] = None, **kwargs) -> None: - # copy the aiokafka behavior - self.topics: Optional[Tuple[str]] = None - self._group_id: Optional[str] = group_id - self._assignment: List[TopicPartition] = [] - self._previous_topic: Optional[Topic] = None - self.partitions_committed: Dict[TopicPartition, int] = {} - - # Called to make sure that has all the kafka attributes like _coordinator - # so it will behave like an real Kafka Consumer - super().__init__() - - def subscribe( - self, - *, - topics: Tuple[str], - listener: RebalanceListener, - **kwargs, - ) -> None: - self.topics = topics - - for topic_name in topics: - topic, created = TopicManager.get_or_create(topic_name, consumer=self) - - if not created: - # It means that the topic already exist, so we are in - # the situation where the topic hs events and the Stream - # was added on runtime - topic.consumer = self - - for partition_number in range(0, 3): - self._assignment.append( - TopicPartition(topic=topic_name, partition=partition_number) - ) - - if listener.stream is not None: - listener.stream.seek_to_initial_offsets() - - def assignment(self) -> List[TopicPartition]: - return self._assignment - - def _check_partition_assignments(self, consumer_record: ConsumerRecord) -> None: - """ - When an event is consumed the partition can be any positive int number - because there is not limit in the producer side (only during testing of course). - In case that the partition is not in the `_assignment` we need to register it. - - This is only during testing as in real use cases the assignments happens - at the moment of kafka bootstrapping - """ - topic_partition = TopicPartition( - topic=consumer_record.topic, - partition=consumer_record.partition, - ) - - if topic_partition not in self._assignment: - self._assignment.append(topic_partition) - - def last_stable_offset(self, topic_partition: TopicPartition) -> int: - topic = TopicManager.get(topic_partition.topic) - - if topic is not None: - return topic.offset(partition=topic_partition.partition) - return -1 - - async def position(self, topic_partition: TopicPartition) -> int: - """ - Get the offset of the *next record* that will be fetched, - so it returns offset(topic_partition) + 1 - """ - return self.last_stable_offset(topic_partition) + 1 - - def highwater(self, topic_partition: TopicPartition) -> int: - """ - A highwater offset is the offset that will be assigned to - the *next message* that is produced, so it returns - offset(topic_partition) + 1 - """ - return self.last_stable_offset(topic_partition) + 1 - - async def commit(self, offsets: Optional[Dict[TopicPartition, int]] = None) -> None: - if offsets is not None: - for topic_partition, offset in offsets.items(): - self.partitions_committed[topic_partition] = offset - return None - - async def committed(self, topic_partition: TopicPartition) -> Optional[int]: - return self.partitions_committed.get(topic_partition, 0) - - async def end_offsets( - self, partitions: List[TopicPartition] - ) -> Dict[TopicPartition, int]: - topic = TopicManager.get(partitions[0].topic) - end_offsets = { - topic_partition: topic.offset(partition=topic_partition.partition) + 1 - for topic_partition in partitions - } - return end_offsets - - def partitions_for_topic(self, topic: str) -> Set: - """ - Return the partitions of all assigned topics. The `topic` argument is not used - because in a testing enviroment the only topics are the ones declared by the end - user. - - The AIOKafkaConsumer returns a Set, so we do the same. - """ - partitions = [topic_partition.partition for topic_partition in self._assignment] - return set(partitions) - - async def getone( - self, - ) -> Optional[ConsumerRecord]: # The return type must be fixed later on - if self._previous_topic: - # Assumes previous record retrieved through getone was completed - self._previous_topic.task_done() - self._previous_topic = None - - topic = None - for topic_partition in self._assignment: - topic = TopicManager.get(topic_partition.topic) - - if not topic.consumed: - break - - if topic is not None: - consumer_record = await topic.get() - self._check_partition_assignments(consumer_record) - self._previous_topic = topic - return consumer_record - - return None - - def seek(self, *, partition: TopicPartition, offset: int) -> None: - # This method intends to have the same signature as aiokafka but with kwargs - # rather than positional arguments - topics = self.topics or () - - if partition.topic in topics: - topic = TopicManager.get(name=partition.topic) - partition_offset = topic.offset(partition=partition.partition) - - # only consume if the offset to seek if <= the parition total events - if offset <= partition_offset: - consumed_events = 0 - - # keep consuming if the events to consume <= offset to seek - while consumed_events < offset: - event = topic.get_nowait() - topic.task_done() - - if event.partition == partition.partition: - # only decrease if the event.partition matches - # the partition that the user wants to seek - consumed_events += 1 - else: - # ideally each partition should be a Queue - # for now just add the same event to the queue - topic.put_nowait(event=event) - - # it means that this consumer can consume - # from the TopicPartition so we can add it - # to the _assignment - if partition not in self._assignment: - self._assignment.append(partition) diff --git a/kstreams/test_utils/test_utils.py b/kstreams/test_utils/test_utils.py deleted file mode 100644 index 248a6de..0000000 --- a/kstreams/test_utils/test_utils.py +++ /dev/null @@ -1,124 +0,0 @@ -from types import TracebackType -from typing import Any, Dict, List, Optional, Type - -from kstreams import ConsumerRecord -from kstreams.engine import StreamEngine -from kstreams.prometheus.monitor import PrometheusMonitor -from kstreams.serializers import Serializer -from kstreams.streams import Stream -from kstreams.types import Headers - -from .structs import RecordMetadata -from .test_clients import TestConsumer, TestProducer -from .topics import Topic, TopicManager - - -class TestMonitor(PrometheusMonitor): - __test__ = False - - def start(self, *args, **kwargs) -> None: - print("herte....") - # ... - - async def stop(self, *args, **kwargs) -> None: - ... - - def add_topic_partition_offset(self, *args, **kwargs) -> None: - ... - - def clean_stream_consumer_metrics(self, *args, **kwargs) -> None: - ... - - def add_producer(self, *args, **kwargs): - ... - - def add_streams(self, *args, **kwargs): - ... - - -class TestStreamClient: - __test__ = False - - def __init__( - self, stream_engine: StreamEngine, monitoring_enabled: bool = True - ) -> None: - self.stream_engine = stream_engine - - # store the user clients to restore them later - self.monitor = stream_engine.monitor - self.producer_class = self.stream_engine.producer_class - self.consumer_class = self.stream_engine.consumer_class - - self.stream_engine.producer_class = TestProducer - self.stream_engine.consumer_class = TestConsumer - - if not monitoring_enabled: - self.stream_engine.monitor = TestMonitor() - - def mock_streams(self) -> None: - streams: List[Stream] = self.stream_engine._streams - for stream in streams: - stream.consumer_class = TestConsumer - - def setup_mocks(self) -> None: - self.mock_streams() - - async def start(self) -> None: - self.setup_mocks() - await self.stream_engine.start() - - async def stop(self) -> None: - # If there are streams, we must wait until all the messages are consumed - if self.stream_engine._streams: - await TopicManager.join() - await self.stream_engine.stop() - - # restore original config - self.stream_engine.producer_class = self.producer_class - self.stream_engine.consumer_class = self.consumer_class - self.stream_engine.monitor = self.monitor - - # clean the topics after finishing the test to make sure that - # no data is left tover - TopicManager.clean() - - async def __aenter__(self) -> "TestStreamClient": - await self.start() - return self - - async def __aexit__( - self, - exc_t: Optional[Type[BaseException]], - exc_v: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - await self.stop() - - async def send( - self, - topic: str, - value: Any = None, - key: Optional[Any] = None, - partition: int = 0, - timestamp_ms: Optional[int] = None, - headers: Optional[Headers] = None, - serializer: Optional[Serializer] = None, - serializer_kwargs: Optional[Dict] = None, - ) -> RecordMetadata: - return await self.stream_engine.send( - topic, - value=value, - key=key, - partition=partition, - timestamp_ms=timestamp_ms, - headers=headers, - serializer=serializer, - serializer_kwargs=serializer_kwargs, - ) - - def get_topic(self, *, topic_name: str) -> Topic: - return TopicManager.get(topic_name) - - async def get_event(self, *, topic_name: str) -> ConsumerRecord: - topic = TopicManager.get(topic_name) - return await topic.get() diff --git a/kstreams/test_utils/topics.py b/kstreams/test_utils/topics.py deleted file mode 100644 index 09c6b81..0000000 --- a/kstreams/test_utils/topics.py +++ /dev/null @@ -1,139 +0,0 @@ -import asyncio -from collections import defaultdict -from dataclasses import dataclass, field -from typing import ClassVar, DefaultDict, Dict, Optional, Tuple - -from kstreams import ConsumerRecord - -from . import test_clients - - -@dataclass -class Topic: - name: str - queue: asyncio.Queue - total_partition_events: DefaultDict[int, int] = field( - default_factory=lambda: defaultdict(lambda: -1) - ) - total_events: int = 0 - # for now we assumed that 1 streams is connected to 1 topic - consumer: Optional["test_clients.Consumer"] = None - - async def put(self, event: ConsumerRecord) -> None: - await self.queue.put(event) - - # keep track of the amount of events per topic partition - self.total_partition_events[event.partition] += 1 - self.total_events += 1 - - async def get(self) -> ConsumerRecord: - return await self.queue.get() - - def get_nowait(self) -> ConsumerRecord: - return self.queue.get_nowait() - - def put_nowait(self, *, event: ConsumerRecord) -> None: - return self.queue.put_nowait(event) - - def task_done(self) -> None: - self.queue.task_done() - - async def join(self) -> None: - await self.queue.join() - - def is_empty(self) -> bool: - return self.queue.empty() - - def size(self) -> int: - return self.queue.qsize() - - def offset(self, *, partition: int) -> int: - return self.total_partition_events[partition] - - @property - def consumed(self) -> bool: - """ - We need to check if the Topic has a Consumer and if there are messages in it - """ - return self.consumer is None or self.is_empty() - - def __str__(self) -> str: - return self.name - - def __repr__(self) -> str: - return ( - f"Topic {self.name} with Consumer: {self.consumer}. " - f"Messages in buffer: {self.queue.qsize()}" - ) - - -@dataclass -class TopicManager: - # The queues will represent the kafka topics during testing - # where the name is the topic name - topics: ClassVar[Dict[str, Topic]] = {} - - @classmethod - def get(cls, name: str) -> Topic: - topic = cls.topics.get(name) - - if topic is not None: - return topic - raise ValueError( - f"You might be trying to get the topic {name} outside the " - "`client async context` or trying to get an event from an empty " - f"topic {name}. Make sure that the code is inside the async context" - "and the topic has events." - ) - - @classmethod - def create( - cls, name: str, consumer: Optional["test_clients.Consumer"] = None - ) -> Topic: - topic = Topic(name=name, queue=asyncio.Queue(), consumer=consumer) - cls.topics[name] = topic - return topic - - @classmethod - def get_or_create( - cls, name: str, consumer: Optional["test_clients.Consumer"] = None - ) -> Tuple[Topic, bool]: - """ - A convenience method for looking up Topic by name. - If the topic does not exist a new one is created. - - Returns a tuple of (Topic, created), where Topic is the - retrieved or created object and created is a boolean - specifying whether a new Topic was created. - """ - try: - topic = cls.get(name) - return topic, False - except ValueError: - topic = cls.create(name, consumer=consumer) - return topic, True - - @classmethod - def all_messages_consumed(cls) -> bool: - """ - Check if all the messages has been consumed for ALL the topics - """ - for topic in cls.topics.values(): - # if there is at least 1 topic with events we need to keep waiting - if not topic.consumed: - return False - return True - - @classmethod - async def join(cls) -> None: - """ - Wait for all topic messages to be processed. - Only topics that have a consumer assigned should be awaited. - """ - await asyncio.gather( - *[topic.join() for topic in cls.topics.values() if not topic.consumed] - ) - - @classmethod - def clean(cls) -> None: - cls.topics = {} diff --git a/tests/test_client.py b/tests/test_client.py index 74e69b8..e69de29 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,387 +0,0 @@ -from unittest.mock import Mock, call, patch - -import pytest - -from kstreams import StreamEngine, TopicPartition, TopicPartitionOffset -from kstreams.streams import Stream -from kstreams.test_utils import ( - TestConsumer, - TestProducer, - TestStreamClient, - TopicManager, -) - -topic = "local--kstreams-consumer" -tp0 = TopicPartition(topic=topic, partition=0) -tp1 = TopicPartition(topic=topic, partition=1) -tp2 = TopicPartition(topic=topic, partition=2) - - -@pytest.mark.asyncio -async def test_engine_clients(stream_engine: StreamEngine): - client = TestStreamClient(stream_engine) - - async with client: - assert stream_engine.consumer_class is TestConsumer - assert stream_engine.producer_class is TestProducer - - # after leaving the context, everything should go to normal - assert client.stream_engine.consumer_class is client.consumer_class - assert client.stream_engine.producer_class is client.producer_class - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "monitoring_enabled", - ( - True, - False, - ), -) -async def test_send_event_with_test_client( - stream_engine: StreamEngine, monitoring_enabled: bool -): - topic = "local--kstreams" - client = TestStreamClient(stream_engine, monitoring_enabled=monitoring_enabled) - - async with client: - metadata = await client.send( - topic, value=b'{"message": "Hello world!"}', key="1" - ) - - assert metadata.topic == topic - assert metadata.partition == 0 - assert metadata.offset == 0 - - # send another event and check that the offset was incremented - metadata = await client.send( - topic, value=b'{"message": "Hello world!"}', key="1" - ) - assert metadata.offset == 1 - - # send en event to a different partition - metadata = await client.send( - topic, value=b'{"message": "Hello world!"}', key="1", partition=2 - ) - - # because it is a different partition the offset should be 1 - assert metadata.offset == 0 - - -@pytest.mark.asyncio -async def test_streams_consume_events(stream_engine: StreamEngine): - client = TestStreamClient(stream_engine) - event = b'{"message": "Hello world!"}' - save_to_db = Mock() - - @stream_engine.stream(topic, name="my-stream") - async def consume(stream): - async for cr in stream: - save_to_db(cr.value) - - async with client: - await client.send(topic, value=event, key="1") - stream = stream_engine.get_stream("my-stream") - assert stream.consumer.assignment() == [tp0, tp1, tp2] - assert stream.consumer.last_stable_offset(tp0) == 0 - assert stream.consumer.highwater(tp0) == 1 - assert await stream.consumer.position(tp0) == 1 - - # check that the event was consumed - save_to_db.assert_called_once_with(event) - - -@pytest.mark.asyncio -async def test_only_consume_topics_with_streams(stream_engine: StreamEngine): - """ - The test creates a stream but no events are send to it, - it means that the `TestStreamClient` should not wait for the topic to be consumed - even thought the topic is exist. - """ - client = TestStreamClient(stream_engine) - topic = "local--kstreams" - - @stream_engine.stream("a-different-topic", name="my-stream") - async def consume(stream): - async for cr in stream: - ... - - async with client: - metadata = await client.send( - topic, value=b'{"message": "Hello world!"}', key="1" - ) - - assert metadata.topic == topic - assert metadata.partition == 0 - assert metadata.offset == 0 - - -@pytest.mark.asyncio -async def test_topic_created(stream_engine: StreamEngine): - topic_name = "local--kstreams" - value = b'{"message": "Hello world!"}' - key = "1" - client = TestStreamClient(stream_engine) - async with client: - await client.send(topic_name, value=value, key=key) - - # check that the event was sent to a Topic - consumer_record = await client.get_event(topic_name=topic_name) - - assert consumer_record.value == value - assert consumer_record.key == key - - -@pytest.mark.asyncio -async def test_get_event_outside_context(stream_engine: StreamEngine): - topic_name = "local--kstreams" - value = b'{"message": "Hello world!"}' - key = "1" - client = TestStreamClient(stream_engine) - async with client: - # produce to events and consume only one in the client context - await client.send(topic_name, value=value, key=key) - await client.send(topic_name, value=value, key=key) - - # check that the event was sent to a Topic - consumer_record = await client.get_event(topic_name=topic_name) - assert consumer_record.value == value - assert consumer_record.key == key - - with pytest.raises(ValueError) as exc: - await client.get_event(topic_name=topic_name) - - assert ( - f"You might be trying to get the topic {topic_name} outside the " - "`client async context` or trying to get an event from an empty " - f"topic {topic_name}. Make sure that the code is inside the async context" - "and the topic has events." - ) == str(exc.value) - - -@pytest.mark.asyncio -async def test_clean_up_events(stream_engine: StreamEngine): - topic_name = "local--kstreams-clean-up" - value = b'{"message": "Hello world!"}' - key = "1" - client = TestStreamClient(stream_engine) - - async with client: - # produce to events and consume only one in the client context - await client.send(topic_name, value=value, key=key) - await client.send(topic_name, value=value, key=key) - - # check that the event was sent to a Topic - consumer_record = await client.get_event(topic_name=topic_name) - assert consumer_record.value == value - assert consumer_record.key == key - - # even though there is still one event in the topic - # after leaving the context the topic should be empty - assert not TopicManager.topics - - -@pytest.mark.asyncio -async def test_partitions_for_topic(stream_engine: StreamEngine): - topic_name = "local--kstreams" - value = b'{"message": "Hello world!"}' - key = "1" - client = TestStreamClient(stream_engine) - - @stream_engine.stream(topic_name, name="my-stream") - async def consume(stream): - async for cr in stream: - ... - - async with client: - # produce to events and consume only one in the client context - await client.send(topic_name, value=value, key=key, partition=0) - await client.send(topic_name, value=value, key=key, partition=2) - await client.send(topic_name, value=value, key=key, partition=10) - - stream = stream_engine.get_stream("my-stream") - assert stream.consumer.partitions_for_topic(topic_name) == set([0, 1, 2, 10]) - - -@pytest.mark.asyncio -async def test_end_offsets(stream_engine: StreamEngine): - topic_name = "local--kstreams" - value = b'{"message": "Hello world!"}' - key = "1" - client = TestStreamClient(stream_engine) - - @stream_engine.stream(topic_name, name="my-stream") - async def consume(stream): - async for cr in stream: - ... - - async with client: - # produce to events and consume only one in the client context - await client.send(topic_name, value=value, key=key, partition=0) - await client.send(topic_name, value=value, key=key, partition=0) - await client.send(topic_name, value=value, key=key, partition=2) - await client.send(topic_name, value=value, key=key, partition=10) - - topic_partitions = [ - TopicPartition(topic_name, 0), - TopicPartition(topic_name, 2), - TopicPartition(topic_name, 10), - ] - - stream = stream_engine.get_stream("my-stream") - assert (await stream.consumer.end_offsets(topic_partitions)) == { - TopicPartition(topic="local--kstreams", partition=0): 2, - TopicPartition(topic="local--kstreams", partition=2): 1, - TopicPartition(topic="local--kstreams", partition=10): 1, - } - - -@pytest.mark.asyncio -async def test_consumer_commit(stream_engine: StreamEngine): - topic_name = "local--kstreams-consumer-commit" - value = b'{"message": "Hello world!"}' - name = "my-stream" - key = "1" - partition = 2 - tp = TopicPartition( - topic=topic_name, - partition=partition, - ) - total_events = 10 - - @stream_engine.stream(topic_name, name=name) - async def my_stream(stream: Stream): - async for cr in stream: - await stream.commit({tp: cr.offset}) - - client = TestStreamClient(stream_engine) - async with client: - for _ in range(0, total_events): - record_metadata = await client.send( - topic_name, partition=partition, value=value, key=key - ) - assert record_metadata.partition == partition - - # check that everything was commited - stream = stream_engine.get_stream(name) - assert (await stream.consumer.committed(tp)) == total_events - 1 - - -@pytest.mark.asyncio -async def test_e2e_example(): - """ - Test that events are produce by the engine and consumed by the streams - """ - from examples.simple import event_store, produce, stream_engine - - client = TestStreamClient(stream_engine) - - async with client: - metadata = await produce() - - assert event_store.total == 5 - assert metadata.partition == 0 - - # check that all events has been consumed - assert TopicManager.all_messages_consumed() - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "monitoring_enabled", - ( - True, - False, - ), -) -async def test_e2e_consume_multiple_topics(monitoring_enabled): - from examples.consume_multiple_topics import produce, stream_engine, topics - - total_events = 2 - client = TestStreamClient(stream_engine, monitoring_enabled=monitoring_enabled) - - async with client: - await produce(total_events) - - topic_1 = TopicManager.get(topics[0]) - topic_2 = TopicManager.get(topics[1]) - - assert topic_1.total_events == total_events - assert topic_2.total_events == total_events - - assert TopicManager.all_messages_consumed() - - -@pytest.mark.asyncio -async def test_streams_consume_events_with_initial_offsets(stream_engine: StreamEngine): - client = TestStreamClient(stream_engine) - event1 = b'{"message": "Hello world1!"}' - event2 = b'{"message": "Hello world2!"}' - process = Mock() - - tp0 = TopicPartition(topic=topic, partition=0) - tp1 = TopicPartition(topic=topic, partition=1) - tp2 = TopicPartition(topic=topic, partition=2) - - from typing import Set - - assignments: Set[TopicPartition] = set() - assignments.update( - tp0, - tp1, - tp2, - ) - - with patch("kstreams.test_utils.test_clients.TestConsumer.seek") as client_seek: - async with client: - await client.send(topic, value=event1, partition=0) - await client.send(topic, value=event1, partition=0) - await client.send(topic, value=event1, partition=0) - await client.send(topic, value=event2, partition=1) - - async def func_stream(consumer: Stream): - async for cr in consumer: - process(cr.value) - - stream: Stream = Stream( - topics=topic, - consumer_class=TestConsumer, - name="my-stream", - func=func_stream, - initial_offsets=[ - # initial topic offset is -1 - TopicPartitionOffset(topic=topic, partition=0, offset=1), - TopicPartitionOffset(topic=topic, partition=1, offset=0), - TopicPartitionOffset(topic=topic, partition=2, offset=10), - ], - ) - stream_engine.add_stream(stream) - await stream.start() - - # simulate partitions assigned on rebalance - await stream.rebalance_listener.on_partitions_assigned(assigned=assignments) - - assert stream.consumer.assignment() == [tp0, tp1, tp2] - - assert stream.consumer.last_stable_offset(tp0) == 2 - assert stream.consumer.highwater(tp0) == 3 - assert await stream.consumer.position(tp0) == 3 - - assert stream.consumer.last_stable_offset(tp1) == 0 - assert stream.consumer.highwater(tp1) == 1 - assert await stream.consumer.position(tp1) == 1 - - # the position will be 0 as the offset 10 does not exist - assert stream.consumer.last_stable_offset(tp2) == -1 - assert stream.consumer.highwater(tp2) == 0 - assert await stream.consumer.position(tp2) == 0 - - client_seek.assert_has_calls( - [ - call(partition=tp0, offset=1), - call(partition=tp1, offset=0), - call(partition=tp2, offset=10), - ], - any_order=True, - ) - process.assert_has_calls([call(event1), call(event1), call(event2)], any_order=True) diff --git a/tests/test_consumer.py b/tests/test_consumer.py index f50a22a..e69de29 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1,162 +0,0 @@ -from typing import Set -from unittest import mock - -import pytest - -from kstreams import ( - ManualCommitRebalanceListener, - MetricsRebalanceListener, - RebalanceListener, - TopicPartition, - create_engine, -) -from kstreams.backends.kafka import Kafka -from kstreams.clients import Consumer -from kstreams.engine import Stream, StreamEngine - - -@pytest.mark.asyncio -async def test_consumer(): - with mock.patch( - "kstreams.clients.aiokafka.AIOKafkaConsumer.start" - ) as mock_start_super: - consumer = Consumer() - - await consumer.start() - mock_start_super.assert_called() - - -@pytest.mark.asyncio -async def test_consumer_with_ssl(ssl_context): - backend = Kafka(security_protocol="SSL", ssl_context=ssl_context) - consumer = Consumer(**backend.dict()) - assert consumer._client._ssl_context - - -@pytest.mark.asyncio -async def test_init_consumer_with_multiple_topics(): - topics = ["my-topic", "my-topic-2"] - consumer = Consumer(*topics) - - assert consumer._client._topics == set(topics) - - -@pytest.mark.asyncio -async def test_consumer_custom_kafka_config(): - kafka_config = { - "bootstrap_servers": ["localhost:9093", "localhost:9094"], - "group_id": "my-group-consumer", - } - - consumer = Consumer("my-topic", **kafka_config) - - # ugly checking of private attributes - assert consumer._client._bootstrap_servers == kafka_config["bootstrap_servers"] - assert consumer._group_id == kafka_config["group_id"] - - -@pytest.mark.asyncio -async def test_add_stream_with_rebalance_listener(stream_engine: StreamEngine): - topic = "local--hello-kpn" - - class MyRebalanceListener(RebalanceListener): - async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None: - ... - - async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None: - ... - - rebalance_listener = MyRebalanceListener() - - with mock.patch("kstreams.clients.aiokafka.AIOKafkaConsumer.start"), mock.patch( - "kstreams.clients.aiokafka.AIOKafkaProducer.start" - ): - - @stream_engine.stream(topic, rebalance_listener=rebalance_listener) - async def my_stream(stream: Stream): - async for _ in stream: - ... - - await stream_engine.start() - await stream_engine.stop() - - assert my_stream.rebalance_listener == rebalance_listener - assert rebalance_listener.stream == my_stream - - # checking that the subscription has also the rebalance_listener - assert my_stream.consumer._subscription._listener == rebalance_listener - - -@pytest.mark.asyncio -async def test_stream_with_default_rebalance_listener(): - topic = "local--hello-kpn" - topic_partitions = set(TopicPartition(topic=topic, partition=0)) - - with mock.patch("kstreams.clients.aiokafka.AIOKafkaConsumer.start"), mock.patch( - "kstreams.clients.aiokafka.AIOKafkaProducer.start" - ), mock.patch("kstreams.PrometheusMonitor.start") as monitor_start, mock.patch( - "kstreams.PrometheusMonitor.stop" - ) as monitor_stop: - # use this function so we can mock PrometheusMonitor - stream_engine = create_engine() - - @stream_engine.stream(topic) - async def my_stream(stream: Stream): - async for _ in stream: - ... - - await stream_engine.start() - rebalance_listener = my_stream.rebalance_listener - - assert isinstance(rebalance_listener, MetricsRebalanceListener) - # checking that the subscription has also the rebalance_listener - assert isinstance( - my_stream.consumer._subscription._listener, MetricsRebalanceListener - ) - assert rebalance_listener.engine == stream_engine - - await rebalance_listener.on_partitions_revoked(revoked=topic_partitions) - await rebalance_listener.on_partitions_assigned(assigned=topic_partitions) - - monitor_stop.assert_called_once() - - # called twice: When the engine starts and on_partitions_assigned - monitor_start.assert_has_calls([mock.call(), mock.call()]) - - await stream_engine.stop() - - -@pytest.mark.asyncio -async def test_stream_manual_commit_rebalance_listener(stream_engine: StreamEngine): - topic = "local--hello-kpn" - topic_partitions = set(TopicPartition(topic=topic, partition=0)) - - with mock.patch("kstreams.clients.aiokafka.AIOKafkaConsumer.start"), mock.patch( - "kstreams.clients.aiokafka.AIOKafkaConsumer.commit" - ) as commit_mock, mock.patch("kstreams.clients.aiokafka.AIOKafkaProducer.start"): - - @stream_engine.stream( - topic, - group_id="example-group", - enable_auto_commit=False, - rebalance_listener=ManualCommitRebalanceListener(), - ) - async def hello_stream(stream: Stream): - async for _ in stream: - ... - - await stream_engine.start() - await stream_engine.stop() - - rebalance_listener = hello_stream.rebalance_listener - - assert isinstance(rebalance_listener, ManualCommitRebalanceListener) - # checking that the subscription has also the rebalance_listener - assert isinstance( - hello_stream.consumer._subscription._listener, ManualCommitRebalanceListener - ) - - await rebalance_listener.on_partitions_revoked(revoked=topic_partitions) - commit_mock.assert_awaited_once() - - await stream_engine.clean_streams() diff --git a/tests/test_monitor.py b/tests/test_monitor.py index f286225..e69de29 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -1,227 +0,0 @@ -import pytest -from prometheus_client import Counter - -from kstreams import PrometheusMonitor, Stream, StreamEngine -from kstreams.backends.kafka import Kafka - - -@pytest.mark.asyncio -async def test_consumer_metrics(mock_consumer_class, stream_engine: StreamEngine): - async def my_coroutine(_): - pass - - backend = Kafka() - stream = Stream( - "local--hello-kpn", - backend=backend, - consumer_class=mock_consumer_class, - func=my_coroutine, - ) - stream_engine.add_stream(stream=stream) - await stream.start() - - await stream_engine.monitor.generate_consumer_metrics(stream.consumer) - consumer = stream.consumer - - for topic_partition in consumer.assignment(): - - # super ugly notation but for now is the only way to get the metrics - met_committed = ( - stream_engine.monitor.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - met_position = ( - stream_engine.monitor.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - met_highwater = ( - stream_engine.monitor.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - met_lag = ( - stream_engine.monitor.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - met_position_lag = ( - stream_engine.monitor.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - consumer_position = await consumer.position(topic_partition) - commited_position = await consumer.committed(topic_partition) - - assert met_committed == commited_position - assert met_position == consumer_position - assert met_highwater == consumer.highwater(topic_partition) - assert met_lag == consumer.highwater(topic_partition) - commited_position - assert ( - met_position_lag == consumer.highwater(topic_partition) - consumer_position - ) - - -@pytest.mark.asyncio -async def test_shared_default_metrics_between_monitors(): - class MyMonitor(PrometheusMonitor): - MY_COUNTER = Counter("my_failures", "Description of counter") - - default_monitor = PrometheusMonitor() - my_monitor = MyMonitor() - - # no more Singlenton - assert default_monitor != my_monitor - - assert default_monitor.MET_OFFSETS == my_monitor.MET_OFFSETS - assert default_monitor.MET_COMMITTED == my_monitor.MET_COMMITTED - assert default_monitor.MET_POSITION == my_monitor.MET_POSITION - assert default_monitor.MET_HIGHWATER == my_monitor.MET_HIGHWATER - assert default_monitor.MET_LAG == my_monitor.MET_LAG - - -@pytest.mark.asyncio -async def test_clean_stream_consumer_metrics( - mock_consumer_class, stream_engine: StreamEngine -): - async def my_coroutine(_): - pass - - backend = Kafka() - stream = Stream( - "local--hello-kpn", - backend=backend, - consumer_class=mock_consumer_class, - func=my_coroutine, - ) - stream_engine.add_stream(stream=stream) - await stream.start() - - await stream_engine.monitor.generate_consumer_metrics(stream.consumer) - consumer = stream.consumer - - for topic_partition in consumer.assignment(): - # super ugly notation but for now is the only way to get the metrics - met_committed = ( - stream_engine.monitor.MET_COMMITTED.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - met_position = ( - stream_engine.monitor.MET_POSITION.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - met_highwater = ( - stream_engine.monitor.MET_HIGHWATER.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - met_lag = ( - stream_engine.monitor.MET_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - met_position_lag = ( - stream_engine.monitor.MET_POSITION_LAG.labels( - topic=topic_partition.topic, - partition=topic_partition.partition, - consumer_group=consumer._group_id, - ) - .collect()[0] - .samples[0] - .value - ) - - consumer_position = await consumer.position(topic_partition) - commited_position = await consumer.committed(topic_partition) - - assert met_committed == commited_position - assert met_position == consumer_position - assert met_highwater == consumer.highwater(topic_partition) - assert met_lag == consumer.highwater(topic_partition) - commited_position - assert ( - met_position_lag == consumer.highwater(topic_partition) - consumer_position - ) - - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 2 - await stream_engine.remove_stream(stream) - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0 - - -@pytest.mark.asyncio -async def test_skip_clean_stream_consumer_metrics( - mock_consumer_class, stream_engine: StreamEngine, caplog -): - async def my_coroutine(_): - pass - - backend = Kafka() - stream = Stream( - "local--hello-kpn", - name="my-stream-name", - backend=backend, - consumer_class=mock_consumer_class, - func=my_coroutine, - ) - stream_engine.add_stream(stream=stream) - await stream.start() - - assert len(stream_engine.monitor.MET_POSITION_LAG.collect()[0].samples) == 0 - await stream_engine.remove_stream(stream) - assert "Metrics for stream: my-stream-name not found" in caplog.text