Skip to content

Commit

Permalink
fix: Prometheus scrape metrics task fixed in order to have a proper s…
Browse files Browse the repository at this point in the history
…hutdown (#124)

The monitor shutdown will wait until the scraping Task is done rather than canceling it.
Parameter metrics_scrape_time added.
  • Loading branch information
marcosschroh authored Jul 26, 2023
1 parent 17be63a commit 8d8b99d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
4 changes: 2 additions & 2 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ async def start(self) -> None:
self.monitor.start()

async def stop(self) -> None:
await self.stop_streams()
await self.stop_producer()
await self.monitor.stop()
await self.stop_producer()
await self.stop_streams()

async def stop_producer(self):
logger.info("Waiting Producer to STOP....")
Expand Down
30 changes: 20 additions & 10 deletions kstreams/prometheus/monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
from typing import DefaultDict, Dict, Optional, TypeVar
from typing import DefaultDict, Dict, List, Optional, TypeVar

from prometheus_client import Gauge

Expand All @@ -17,6 +17,10 @@
class PrometheusMonitor:
"""
Metrics monitor to keep track of Producers and Consumers.
Attributes:
metrics_scrape_time float: Amount of seconds that the monitor
will wait until next scrape iteration
"""

# Producer metrics
Expand Down Expand Up @@ -51,23 +55,26 @@ class PrometheusMonitor:
["topic", "partition", "consumer_group"],
)

def __init__(self):
def __init__(self, metrics_scrape_time: float = 3):
self.metrics_scrape_time = metrics_scrape_time
self.running = False
self._producer = None
self._streams = []
self._streams: List[Stream] = []
self._task: Optional[asyncio.Task] = None

def start(self) -> None:
logger.info("Starting Prometheus metrics...")
self.running = True
self._task = asyncio.create_task(self._metrics_task())

async def stop(self) -> None:
logger.info("Stoping Prometheus metrics...")
if self._task is not None:
self._task.cancel()
self.running = False

# we need to make sure that the task is cancelled
if self._task is not None:
# we need to make sure that the task is `done`
# to clean up properly
while not self._task.cancelled():
while not self._task.done():
await asyncio.sleep(0.1)

self._clean_consumer_metrics()
Expand Down Expand Up @@ -214,10 +221,13 @@ async def generate_consumer_metrics(self, consumer: ConsumerType):
async def _metrics_task(self) -> None:
"""
Asyncio Task that runs in `backgroud` to generate
consumer metrics
consumer metrics.
When self.running is False the task will finish and it
will be safe to stop consumers and producers.
"""
while True:
await asyncio.sleep(3)
while self.running:
await asyncio.sleep(self.metrics_scrape_time)
for stream in self._streams:
if stream.consumer is not None:
try:
Expand Down
14 changes: 9 additions & 5 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
self.func = func
self.backend = backend
self.consumer_class = consumer_class
self.consumer: Optional[Type[ConsumerType]] = None
self.consumer: Optional[ConsumerType] = None
self.config = config or {}
self._consumer_task: Optional[asyncio.Task] = None
self.name = name or str(uuid.uuid4())
Expand All @@ -115,7 +115,7 @@ def __init__(
# so we always create a list and then we expand it with *topics
self.topics = [topics] if isinstance(topics, str) else topics

def _create_consumer(self) -> Type[ConsumerType]:
def _create_consumer(self) -> ConsumerType:
if self.backend is None:
raise BackendNotSet("A backend has not been set for this stream")
config = {**self.backend.dict(), **self.config}
Expand All @@ -135,10 +135,14 @@ async def stop(self) -> None:
async def _subscribe(self) -> None:
# Always create a consumer on stream.start
self.consumer = self._create_consumer()
await self.consumer.start()
self.running = True

self.consumer.subscribe(topics=self.topics, listener=self.rebalance_listener)
# add the chech tp avoid `mypy` complains
if self.consumer is not None:
await self.consumer.start()
self.consumer.subscribe(
topics=self.topics, listener=self.rebalance_listener
)
self.running = True

async def commit(self, offsets: Optional[Dict[TopicPartition, int]] = None):
await self.consumer.commit(offsets=offsets) # type: ignore
Expand Down

0 comments on commit 8d8b99d

Please sign in to comment.