From b84d128873fa8fa54a3b818a9e18a4b58c889705 Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Fri, 28 Jun 2024 23:06:13 +0200 Subject: [PATCH] Version kafka msg (#48) * version in kafka msg * working --------- Co-authored-by: extreme4all <> --- Makefile | 3 +++ docker-compose.yaml | 13 +++++++++++-- src/api/v2/report.py | 2 +- src/app/repositories/report.py | 12 ++++++++++-- src/app/views/input/_metadata.py | 5 +++++ src/app/views/input/report.py | 20 +++++++++++++++++++- 6 files changed, 49 insertions(+), 6 deletions(-) create mode 100644 src/app/views/input/_metadata.py diff --git a/Makefile b/Makefile index 07522fc..5c79e01 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,9 @@ docker-restart: ## restart containers docker compose up --build -d docker-test: docker-restart ## restart containers & test + pytest + +docker-test-verbose: docker-restart ## restart containers & test pytest -s pre-commit-setup: ## Install pre-commit diff --git a/docker-compose.yaml b/docker-compose.yaml index 1cab4a0..b44c027 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,4 +1,3 @@ -version: '3' services: kafka: container_name: kafka @@ -106,11 +105,21 @@ services: - POOL_TIMEOUT=30 # env_file: # - .env + healthcheck: + test: ["CMD-SHELL", "python -c \"import requests; assert requests.get('http://localhost:5000/').status_code == 200\""] + interval: 5s + timeout: 10s + retries: 3 depends_on: kafka: condition: service_healthy mysql: condition: service_healthy - + wait_for_api: + image: alpine:latest + container_name: wait_for_api + depends_on: + public_api: + condition: service_healthy networks: botdetector-network: diff --git a/src/api/v2/report.py b/src/api/v2/report.py index 55dd9b8..ffe4e84 100644 --- a/src/api/v2/report.py +++ b/src/api/v2/report.py @@ -18,5 +18,5 @@ async def post_reports(detections: list[Detection]): if not data: raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="invalid data") logger.debug(f"Working: {len(data)}") - await report.send_to_kafka(data) + await report.send_to_kafka(data=data) return Ok() diff --git a/src/app/repositories/report.py b/src/app/repositories/report.py index 9940206..fc0c413 100644 --- a/src/app/repositories/report.py +++ b/src/app/repositories/report.py @@ -2,7 +2,7 @@ import logging import time -from src.app.views.input.report import Detection +from src.app.views.input.report import Detection, KafkaDetectionV1 from src.core.fastapi.dependencies import kafka_engine logger = logging.getLogger(__name__) @@ -60,8 +60,16 @@ async def parse_data(self, data: list[Detection]) -> list[Detection] | None: return None return data + def detection_to_v1(self, data: list[Detection]) -> list[KafkaDetectionV1]: + logger.debug(f"received: {len(data)}") + _data = [KafkaDetectionV1(**d.model_dump()) for d in data] + logger.debug(f"received: {len(_data)}") + return _data + async def send_to_kafka(self, data: list[Detection]) -> None: - detections = [d.model_dump(mode="json") for d in data] + data_vx = self.detection_to_v1(data=data) + assert len(data) == len(data_vx) + detections = [d.model_dump(mode="json") for d in data_vx] send_queue = kafka_engine.producer.get_queue() await asyncio.gather(*[send_queue.put(d) for d in detections]) return diff --git a/src/app/views/input/_metadata.py b/src/app/views/input/_metadata.py new file mode 100644 index 0000000..bd87da8 --- /dev/null +++ b/src/app/views/input/_metadata.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class Metadata(BaseModel): + version: str diff --git a/src/app/views/input/report.py b/src/app/views/input/report.py index 3f1b328..68d9a84 100644 --- a/src/app/views/input/report.py +++ b/src/app/views/input/report.py @@ -4,6 +4,8 @@ from pydantic import BaseModel from pydantic.fields import Field +from src.app.views.input._metadata import Metadata + class Equipment(BaseModel): equip_head_id: Optional[int] = Field(None, ge=0) @@ -17,7 +19,7 @@ class Equipment(BaseModel): equip_shield_id: Optional[int] = Field(None, ge=0) -class Detection(BaseModel): +class BaseDetection(BaseModel): reporter: str = Field(..., min_length=1, max_length=13) reported: str = Field(..., min_length=1, max_length=12) region_id: int = Field(0, ge=0, le=100_000) @@ -31,3 +33,19 @@ class Detection(BaseModel): world_number: int = Field(0, ge=300, le=1_000) equipment: Equipment equip_ge_value: int = Field(0, ge=0) + + +class Detection(BaseDetection): + reporter: str = Field(..., min_length=1, max_length=13) + reported: str = Field(..., min_length=1, max_length=12) + + +class ParsedDetection(BaseDetection): + reporter_id: int = Field(..., ge=0) + reported_id: int = Field(..., ge=0) + + +class KafkaDetectionV1(BaseDetection): + metadata: Metadata = Metadata(version="v1.0.0") + reporter: str = Field(..., min_length=1, max_length=13) + reported: str = Field(..., min_length=1, max_length=12)