From c1f55b3e00552d10234c1b27a7df8b7bcda5ba74 Mon Sep 17 00:00:00 2001 From: extreme4all <> Date: Thu, 4 Jan 2024 01:07:39 +0100 Subject: [PATCH] working consuming messages --- src/{app/kafka.py => _kafka.py} | 0 src/gracefull_shutdown.py | 20 ++++++++++++++ src/main.py | 46 +++++++++++++++++++++++---------- 3 files changed, 53 insertions(+), 13 deletions(-) rename src/{app/kafka.py => _kafka.py} (100%) create mode 100644 src/gracefull_shutdown.py diff --git a/src/app/kafka.py b/src/_kafka.py similarity index 100% rename from src/app/kafka.py rename to src/_kafka.py diff --git a/src/gracefull_shutdown.py b/src/gracefull_shutdown.py new file mode 100644 index 0000000..2e69ef2 --- /dev/null +++ b/src/gracefull_shutdown.py @@ -0,0 +1,20 @@ +import asyncio +import logging +import signal + +logger = logging.getLogger(__name__) + + +class GracefulShutdown: + def __init__(self, shutdown_event: asyncio.Event, shutdown_sequence): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + self.shutdown_event = shutdown_event + self.shutdown_sequence = shutdown_sequence + + def exit_gracefully(self, signum, frame): + logger.info("shutdown") + self.shutdown_event.set() + + loop = asyncio.get_event_loop() + loop.create_task(self.shutdown_sequence) diff --git a/src/main.py b/src/main.py index e54e4ac..d492dfa 100644 --- a/src/main.py +++ b/src/main.py @@ -3,17 +3,16 @@ import logging import time import traceback -from asyncio import Queue +from asyncio import Event, Queue from datetime import datetime -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from sqlalchemy import insert, select, update from sqlalchemy.exc import OperationalError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.sql.expression import Insert, Select, Update +import _kafka import core.logging # for log formatting -from app import kafka from core.config import settings from database.database import get_session @@ -26,15 +25,19 @@ async def insert_report(session: AsyncSession, data: dict): # Define a function to process data from a queue -async def process_data(receive_queue: Queue, error_queue: Queue): +async def process_data(receive_queue: Queue, error_queue: Queue, shutdown_event: Event): # Initialize counter and start time counter = 0 start_time = time.time() # Run indefinitely - while True: - start_time, counter = kafka.log_speed( - counter=counter, start_time=start_time, receive_queue=receive_queue + while not shutdown_event.is_set(): + start_time, counter = _kafka.log_speed( + counter=counter, + start_time=start_time, + _queue=receive_queue, + topic="report", + interval=60, ) # Check if both queues are empty if receive_queue.empty(): @@ -43,8 +46,11 @@ async def process_data(receive_queue: Queue, error_queue: Queue): # Get a message from the chosen queue message: dict = await receive_queue.get() + + # TEMP print(message) await asyncio.sleep(1) + receive_queue.task_done() continue try: @@ -77,25 +83,39 @@ async def process_data(receive_queue: Queue, error_queue: Queue): async def main(): TOPIC = "report" GROUP = "report-worker" + shutdown_event = Event() + # get kafka engine - consumer = await kafka.kafka_consumer( + consumer = await _kafka.kafka_consumer( topic=TOPIC, group=GROUP, bootstrap_servers=[settings.KAFKA_HOST] ) - producer = await kafka.kafka_producer(bootstrap_servers=[settings.KAFKA_HOST]) + producer = await _kafka.kafka_producer(bootstrap_servers=[settings.KAFKA_HOST]) receive_queue = Queue(maxsize=100) send_queue = Queue(maxsize=100) asyncio.create_task( - kafka.receive_messages( - consumer=consumer, receive_queue=receive_queue, error_queue=send_queue + _kafka.receive_messages( + consumer=consumer, + receive_queue=receive_queue, + shutdown_event=shutdown_event, + batch_size=200, ) ) asyncio.create_task( - kafka.send_messages(topic=TOPIC, producer=producer, send_queue=send_queue) + _kafka.send_messages( + topic=TOPIC, + producer=producer, + send_queue=send_queue, + shutdown_event=shutdown_event, + ) ) asyncio.create_task( - process_data(receive_queue=receive_queue, error_queue=send_queue) + process_data( + receive_queue=receive_queue, + error_queue=send_queue, + shutdown_event=shutdown_event, + ) ) while True: