Skip to content

Commit

Permalink
working consuming messages
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Jan 4, 2024
1 parent 8630888 commit c1f55b3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 13 deletions.
File renamed without changes.
20 changes: 20 additions & 0 deletions src/gracefull_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import asyncio
import logging
import signal

logger = logging.getLogger(__name__)


class GracefulShutdown:
def __init__(self, shutdown_event: asyncio.Event, shutdown_sequence):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
self.shutdown_event = shutdown_event
self.shutdown_sequence = shutdown_sequence

def exit_gracefully(self, signum, frame):
logger.info("shutdown")
self.shutdown_event.set()

loop = asyncio.get_event_loop()
loop.create_task(self.shutdown_sequence)
46 changes: 33 additions & 13 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@
import logging
import time
import traceback
from asyncio import Queue
from asyncio import Event, Queue
from datetime import datetime

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from sqlalchemy import insert, select, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql.expression import Insert, Select, Update

import _kafka
import core.logging # for log formatting
from app import kafka
from core.config import settings
from database.database import get_session

Expand All @@ -26,15 +25,19 @@ async def insert_report(session: AsyncSession, data: dict):


# Define a function to process data from a queue
async def process_data(receive_queue: Queue, error_queue: Queue):
async def process_data(receive_queue: Queue, error_queue: Queue, shutdown_event: Event):
# Initialize counter and start time
counter = 0
start_time = time.time()

# Run indefinitely
while True:
start_time, counter = kafka.log_speed(
counter=counter, start_time=start_time, receive_queue=receive_queue
while not shutdown_event.is_set():
start_time, counter = _kafka.log_speed(
counter=counter,
start_time=start_time,
_queue=receive_queue,
topic="report",
interval=60,
)
# Check if both queues are empty
if receive_queue.empty():
Expand All @@ -43,8 +46,11 @@ async def process_data(receive_queue: Queue, error_queue: Queue):

# Get a message from the chosen queue
message: dict = await receive_queue.get()

# TEMP
print(message)
await asyncio.sleep(1)
receive_queue.task_done()
continue

try:
Expand Down Expand Up @@ -77,25 +83,39 @@ async def process_data(receive_queue: Queue, error_queue: Queue):
async def main():
TOPIC = "report"
GROUP = "report-worker"
shutdown_event = Event()

# get kafka engine
consumer = await kafka.kafka_consumer(
consumer = await _kafka.kafka_consumer(
topic=TOPIC, group=GROUP, bootstrap_servers=[settings.KAFKA_HOST]
)
producer = await kafka.kafka_producer(bootstrap_servers=[settings.KAFKA_HOST])
producer = await _kafka.kafka_producer(bootstrap_servers=[settings.KAFKA_HOST])

receive_queue = Queue(maxsize=100)
send_queue = Queue(maxsize=100)

asyncio.create_task(
kafka.receive_messages(
consumer=consumer, receive_queue=receive_queue, error_queue=send_queue
_kafka.receive_messages(
consumer=consumer,
receive_queue=receive_queue,
shutdown_event=shutdown_event,
batch_size=200,
)
)
asyncio.create_task(
kafka.send_messages(topic=TOPIC, producer=producer, send_queue=send_queue)
_kafka.send_messages(
topic=TOPIC,
producer=producer,
send_queue=send_queue,
shutdown_event=shutdown_event,
)
)
asyncio.create_task(
process_data(receive_queue=receive_queue, error_queue=send_queue)
process_data(
receive_queue=receive_queue,
error_queue=send_queue,
shutdown_event=shutdown_event,
)
)

while True:
Expand Down

0 comments on commit c1f55b3

Please sign in to comment.