diff --git a/Makefile b/Makefile index ca9401d0..07522fcf 100644 --- a/Makefile +++ b/Makefile @@ -56,6 +56,7 @@ test-setup: ## installs pytest singular package for local testing requirements: ## installs all requirements python3 -m pip install -r requirements.txt + python3 -m pip install ruff create-env: ## create .env file echo "ENV=DEV" > .env diff --git a/README.md b/README.md index b84daebe..ce3a7055 100644 --- a/README.md +++ b/README.md @@ -155,3 +155,14 @@ call pip install -r requirements.txt --upgrade call pip freeze > requirements.txt powershell "(Get-Content requirements.txt) | ForEach-Object { $_ -replace '>=', '==' } | Set-Content requirements.txt" ``` +upgrading with linux +```sh +sed -i 's/==/>=/g' requirements.txt +pip install -r requirements.txt --upgrade +pip freeze > requirements.txt +``` +if you are running the cluster +```sh +kubectl port-forward -n kafka svc/bd-prd-kafka-service 9094:9094 +kubectl port-forward -n database svc/mysql 3306:3306 +``` diff --git a/docker-compose.yaml b/docker-compose.yaml index 88fd5e3c..1cab4a0d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -16,8 +16,12 @@ services: - 9092:9092 healthcheck: test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server", "localhost:9092"] + # during this period fails are not considered + start_period: 30s + # time between cmd interval: 30s - timeout: 10s + # time given to the cmd + timeout: 5s retries: 5 networks: - botdetector-network @@ -68,9 +72,12 @@ services: - botdetector-network healthcheck: test: "mysqladmin ping -h localhost -u root -proot_bot_buster" + # during this period fails are not considered + start_period: 30s + # time between cmd interval: 30s - timeout: 10s - retries: 5 + # time given to the cmd + timeout: 5s public_api: container_name: public_api diff --git a/notes.md b/notes.md deleted file mode 100644 index 4fa2d2d4..00000000 --- a/notes.md +++ /dev/null @@ -1,17 +0,0 @@ -kubectl -```sh -kubectl port-forward -n kafka svc/bd-prd-kafka-service 9094:9094 -kubectl port-forward -n database svc/mysql 3306:3306 -``` - -```sh -python -m venv .venv -.venv\Scripts\activate -python -m pip install --upgrade pip -pip install -r requirements.txt -``` - -```sh -.venv\Scripts\activate -pip freeze > requirements.txt -``` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0bf26d29..3ace0c9f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,57 +1,75 @@ aiokafka==0.10.0 +AioKafkaEngine==0.0.4 aiomysql==0.2.0 -annotated-types==0.6.0 +annotated-types==0.7.0 anyio==4.3.0 async-timeout==4.0.3 asyncmy==0.2.9 -attrs==23.1.0 -black==24.3.0 -certifi==2023.7.22 +attrs==23.2.0 +black==24.4.2 +certifi==2024.2.2 cffi==1.16.0 cfgv==3.4.0 charset-normalizer==3.3.2 click==8.1.7 colorama==0.4.6 -cryptography==42.0.5 +cryptography==42.0.7 databases==0.9.0 distlib==0.3.8 -exceptiongroup==1.2.0 -fastapi==0.110.0 -filelock==3.13.1 +dnspython==2.6.1 +email_validator==2.1.1 +exceptiongroup==1.2.1 +fastapi==0.111.0 +fastapi-cli==0.0.4 +filelock==3.14.0 greenlet==3.0.3 h11==0.14.0 -httpcore==1.0.4 +httpcore==1.0.5 httptools==0.6.1 httpx==0.27.0 -hypothesis==6.88.3 -identify==2.5.35 -idna==3.6 +hypothesis==6.102.6 +identify==2.5.36 +idna==3.7 iniconfig==2.0.0 +Jinja2==3.1.4 kafka-python==2.0.2 +markdown-it-py==3.0.0 +MarkupSafe==2.1.5 +mdurl==0.1.2 mypy-extensions==1.0.0 nodeenv==1.8.0 +orjson==3.10.3 packaging==24.0 pathspec==0.12.1 -platformdirs==4.2.0 -pluggy==1.3.0 -pre-commit==3.6.2 -pycparser==2.21 -pydantic==2.6.4 +platformdirs==4.2.2 +pluggy==1.5.0 +pre-commit==3.7.1 +pycparser==2.22 +pydantic==2.7.1 pydantic-settings==2.2.1 -pydantic_core==2.16.3 -PyMySQL==1.1.0 -pytest==7.4.3 +pydantic_core==2.18.2 +Pygments==2.18.0 +PyMySQL==1.1.1 +pytest==8.2.1 +pytest-asyncio==0.23.7 python-dotenv==1.0.1 +python-multipart==0.0.9 PyYAML==6.0.1 -requests==2.31.0 +requests==2.32.2 +rich==13.7.1 +ruff==0.4.5 +shellingham==1.5.4 sniffio==1.3.1 sortedcontainers==2.4.0 -SQLAlchemy==2.0.28 -starlette==0.36.3 +SQLAlchemy==2.0.30 +starlette==0.37.2 tomli==2.0.1 -typing_extensions==4.10.0 -urllib3==2.0.7 -uvicorn==0.28.0 -virtualenv==20.25.1 +typer==0.12.3 +typing_extensions==4.12.0 +ujson==5.10.0 +urllib3==2.2.1 +uvicorn==0.29.0 +uvloop==0.19.0 +virtualenv==20.26.2 watchfiles==0.21.0 websockets==12.0 diff --git a/src/app/repositories/report.py b/src/app/repositories/report.py index 1b2a2f3d..99402063 100644 --- a/src/app/repositories/report.py +++ b/src/app/repositories/report.py @@ -3,7 +3,7 @@ import time from src.app.views.input.report import Detection -from src.core import config +from src.core.fastapi.dependencies import kafka_engine logger = logging.getLogger(__name__) @@ -62,7 +62,6 @@ async def parse_data(self, data: list[Detection]) -> list[Detection] | None: async def send_to_kafka(self, data: list[Detection]) -> None: detections = [d.model_dump(mode="json") for d in data] - await asyncio.gather( - *[config.send_queue.put(detection) for detection in detections] - ) + send_queue = kafka_engine.producer.get_queue() + await asyncio.gather(*[send_queue.put(d) for d in detections]) return diff --git a/src/app/views/input/report.py b/src/app/views/input/report.py index 3ba2afd6..3f1b3282 100644 --- a/src/app/views/input/report.py +++ b/src/app/views/input/report.py @@ -1,3 +1,4 @@ +import time from typing import Optional from pydantic import BaseModel @@ -23,7 +24,7 @@ class Detection(BaseModel): x_coord: int = Field(0, ge=0) y_coord: int = Field(0, ge=0) z_coord: int = Field(0, ge=0) - ts: int = Field(0, ge=0) + ts: int = Field(int(time.time()), ge=0) manual_detect: int = Field(0, ge=0, le=1) on_members_world: int = Field(0, ge=0, le=1) on_pvp_world: int = Field(0, ge=0, le=1) diff --git a/src/core/config.py b/src/core/config.py index d2e5df3e..3cf55757 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -1,7 +1,8 @@ -import asyncio - +from dotenv import find_dotenv, load_dotenv from pydantic_settings import BaseSettings +load_dotenv(find_dotenv()) + class Settings(BaseSettings): ENV: str @@ -12,6 +13,3 @@ class Settings(BaseSettings): settings = Settings() -producer = None -send_queue = None -sd_event = asyncio.Event() diff --git a/src/core/fastapi/dependencies/_kafka.py b/src/core/fastapi/dependencies/_kafka.py deleted file mode 100644 index b9bbecae..00000000 --- a/src/core/fastapi/dependencies/_kafka.py +++ /dev/null @@ -1,156 +0,0 @@ -import asyncio -import functools -import json -import logging -import time -import traceback -from asyncio import Event, Queue - -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer - -from src.core import config - -logger = logging.getLogger(__name__) - - -def print_traceback(_, error): - error_type = type(error) - logger.error( - { - "error_type": error_type.__name__, - "error": error, - } - ) - tb_str = traceback.format_exc() - logger.error(f"{error}, \n{tb_str}") - - -def retry(max_retries=3, retry_delay=5, on_retry=None, on_failure=None): - def wrapper(func): - @functools.wraps(func) - async def wrapped(*args, **kwargs): - retry_count = 0 - - while retry_count < max_retries: - try: - return await func(*args, **kwargs) - except Exception as e: - if on_retry: - on_retry(retry_count, e) - - retry_count += 1 - logger.error(f"Error: {e}") - logger.info(f"Retrying ({retry_count}/{max_retries})...") - await asyncio.sleep(retry_delay) # Add a delay before retrying - - if on_failure: - on_failure(retry_count) - - raise RuntimeError(f"Failed after {max_retries} retries") - - return wrapped - - return wrapper - - -def log_speed( - counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 15 -) -> tuple[float, int]: - # Calculate the time elapsed since the function started - delta_time = time.time() - start_time - - # Check if the specified interval has not elapsed yet - if delta_time < interval: - # Return the original start time and the current counter value - return start_time, counter - - # Calculate the processing speed (messages per second) - speed = counter / delta_time - - # Log the processing speed and relevant information - log_message = ( - f"{topic=}, qsize={_queue.qsize()}, " - f"processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec" - ) - logger.info(log_message) - - # Return the current time and reset the counter to zero - return time.time(), 0 - - -@retry(max_retries=3, retry_delay=5, on_failure=print_traceback) -async def kafka_consumer(topic: str, group: str): - logger.info(f"Starting consumer, {topic=}, {group=}, {config.settings.KAFKA_HOST=}") - - consumer = AIOKafkaConsumer( - topic, - bootstrap_servers=[config.settings.KAFKA_HOST], - group_id=group, - value_deserializer=lambda x: json.loads(x.decode("utf-8")), - auto_offset_reset="earliest", - ) - await consumer.start() - logger.info("Started") - return consumer - - -@retry(max_retries=3, retry_delay=5, on_failure=print_traceback) -async def kafka_producer(): - logger.info("Starting producer") - - producer = AIOKafkaProducer( - bootstrap_servers=[config.settings.KAFKA_HOST], - value_serializer=lambda v: json.dumps(v).encode(), - acks="all", - ) - await producer.start() - logger.info("Started") - return producer - - -@retry(max_retries=3, retry_delay=5, on_failure=print_traceback) -async def receive_messages( - consumer: AIOKafkaConsumer, - receive_queue: Queue, - shutdown_event: Event, - batch_size: int = 200, -): - while not shutdown_event.is_set(): - batch = await consumer.getmany(timeout_ms=1000, max_records=batch_size) - for tp, messages in batch.items(): - logger.info(f"Partition {tp}: {len(messages)} messages") - await asyncio.gather(*[receive_queue.put(m.value) for m in messages]) - logger.info("done") - await consumer.commit() - - logger.info("shutdown") - - -@retry(max_retries=3, retry_delay=5, on_failure=print_traceback) -async def send_messages( - topic: str, - producer: AIOKafkaProducer, - send_queue: Queue, - shutdown_event: Event, -): - start_time = time.time() - messages_sent = 0 - - while not shutdown_event.is_set(): - start_time, messages_sent = log_speed( - counter=messages_sent, - start_time=start_time, - _queue=send_queue, - topic=topic, - ) - if send_queue.empty(): - await asyncio.sleep(1) - continue - - message = await send_queue.get() - await producer.send(topic, value=message) - send_queue.task_done() - - messages_sent += 1 - - logger.info("shutdown") diff --git a/src/core/fastapi/dependencies/kafka_engine.py b/src/core/fastapi/dependencies/kafka_engine.py new file mode 100644 index 00000000..e11f7cb8 --- /dev/null +++ b/src/core/fastapi/dependencies/kafka_engine.py @@ -0,0 +1,9 @@ +from AioKafkaEngine import ProducerEngine + +from src.core.config import settings + +producer = ProducerEngine( + bootstrap_servers=[settings.KAFKA_HOST], + report_interval=60, + queue_size=500, +) diff --git a/src/core/server.py b/src/core/server.py index 4fa5ecf0..9c3d6956 100644 --- a/src/core/server.py +++ b/src/core/server.py @@ -1,4 +1,3 @@ -import asyncio import logging from contextlib import asynccontextmanager @@ -7,8 +6,7 @@ from fastapi.middleware.cors import CORSMiddleware from src import api -from src.core import config -from src.core.fastapi.dependencies import _kafka +from src.core.fastapi.dependencies import kafka_engine from src.core.fastapi.middleware.logging import LoggingMiddleware logger = logging.getLogger(__name__) @@ -40,19 +38,9 @@ def make_middleware() -> list[Middleware]: @asynccontextmanager async def lifespan(app: FastAPI): logger.info("startup initiated") - config.producer = await _kafka.kafka_producer() - config.send_queue = asyncio.Queue(maxsize=500) - asyncio.create_task( - _kafka.send_messages( - topic="report", - producer=config.producer, - send_queue=config.send_queue, - shutdown_event=config.sd_event, - ) - ) + await kafka_engine.producer.start_engine(topic="report") yield - config.sd_event.set() - await config.producer.stop() + await kafka_engine.producer.stop_engine() def create_app() -> FastAPI: diff --git a/tests/test_report.py b/tests/test_report.py index 55286e55..ca86096f 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -1,12 +1,9 @@ import os import sys import time -from typing import Optional import pytest from httpx import AsyncClient -from pydantic import BaseModel -from pydantic.fields import Field sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))