Skip to content

Commit

Permalink
minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Apr 22, 2024
1 parent dce796e commit 0898133
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ services:
- ./mysql/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
# - ./mysql/mount:/var/lib/mysql # creates persistence
ports:
- 3306:3306
- 3307:3306
networks:
- botdetector-network
healthcheck:
Expand Down
90 changes: 90 additions & 0 deletions src/_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,93 @@ async def send_messages(
messages_sent += 1

logger.info("shutdown")


class AioKafkaEngine:
def __init__(
self,
receive_queue: Queue,
send_queue: Queue,
producer: AIOKafkaProducer,
consumer: AIOKafkaConsumer,
) -> None:
self.receive_queue = receive_queue
self.send_queue = send_queue
self.producer = producer
self.consumer = consumer

def _log_speed(
counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 60
) -> tuple[float, int]:
# Calculate the time elapsed since the function started
delta_time = time.time() - start_time

# Check if the specified interval has not elapsed yet
if delta_time < interval:
# Return the original start time and the current counter value
return start_time, counter

# Calculate the processing speed (messages per second)
speed = counter / delta_time

# Log the processing speed and relevant information
log_message = (
f"{topic=}, qsize={_queue.qsize()}, "
f"processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec"
)
logger.info(log_message)

# Return the current time and reset the counter to zero
return time.time(), 0

async def produce_messages(self, shutdown_event: Event, topic: str):
start_time = time.time()
messages_sent = 0

while not shutdown_event.is_set():
start_time, messages_sent = log_speed(
counter=messages_sent,
start_time=start_time,
_queue=self.send_queue,
topic=topic,
)
if self.send_queue.empty():
await asyncio.sleep(1)
continue

message = await self.send_queue.get()
await self.producer.send(topic, value=message)
self.send_queue.task_done()

messages_sent += 1

logger.info("shutdown")

async def consume_messages(self, shutdown_event: Event, batch_size: int = 200):
while not shutdown_event.is_set():
batch = await self.consumer.getmany(timeout_ms=1000, max_records=batch_size)
for tp, messages in batch.items():
await asyncio.gather(
*[self.receive_queue.put(m.value) for m in messages]
)
await self.consumer.commit()
logger.info(f"Partition {tp}: {len(messages)} messages")
logger.info("shutdown")

async def start(
self,
producer_topic: str,
producer_shutdown_event: Event,
consumer_shutdown_event: Event,
consumer_batch_size: int,
):
asyncio.create_task(
self.consume_messages(
shutdown_event=consumer_shutdown_event, batch_size=consumer_batch_size
)
)
asyncio.create_task(
self.produce_messages(
shutdown_event=producer_shutdown_event, topic=producer_topic
)
)
34 changes: 15 additions & 19 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
import traceback
from asyncio import Event, Queue

from sqlalchemy import insert, select
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import AsyncResult, AsyncSession
from sqlalchemy.sql.expression import Insert, Select

import _kafka
from app.views.player import PlayerInDB
from app.views.report import ReportInQueue, StgReportCreate, convert_report_q_to_db
from core.config import settings
from database.database import get_session, model_to_dict
from database.models.player import Player
from database.models.report import StgReport
from sqlalchemy import insert, select
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import AsyncResult, AsyncSession
from sqlalchemy.sql.expression import Insert, Select

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -124,22 +123,19 @@ async def main():
receive_queue = Queue(maxsize=100)
send_queue = Queue(maxsize=100)

asyncio.create_task(
_kafka.receive_messages(
consumer=consumer,
receive_queue=receive_queue,
shutdown_event=shutdown_event,
batch_size=200,
)
engine = _kafka.AioKafkaEngine(
receive_queue=receive_queue,
send_queue=send_queue,
producer=producer,
consumer=consumer,
)
asyncio.create_task(
_kafka.send_messages(
topic=TOPIC,
producer=producer,
send_queue=send_queue,
shutdown_event=shutdown_event,
)
await engine.start(
consumer_batch_size=200,
producer_shutdown_event=shutdown_event,
consumer_shutdown_event=shutdown_event,
producer_topic=TOPIC,
)

asyncio.create_task(
process_data(
receive_queue=receive_queue,
Expand Down

0 comments on commit 0898133

Please sign in to comment.