Skip to content

Commit

Permalink
Version kafka msg (#48)
Browse files Browse the repository at this point in the history
* version in kafka msg

* working

---------

Co-authored-by: extreme4all <>
  • Loading branch information
extreme4all authored Jun 28, 2024
1 parent a511796 commit b84d128
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 6 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ docker-restart: ## restart containers
docker compose up --build -d

docker-test: docker-restart ## restart containers & test
pytest

docker-test-verbose: docker-restart ## restart containers & test
pytest -s

pre-commit-setup: ## Install pre-commit
Expand Down
13 changes: 11 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3'
services:
kafka:
container_name: kafka
Expand Down Expand Up @@ -106,11 +105,21 @@ services:
- POOL_TIMEOUT=30
# env_file:
# - .env
healthcheck:
test: ["CMD-SHELL", "python -c \"import requests; assert requests.get('http://localhost:5000/').status_code == 200\""]
interval: 5s
timeout: 10s
retries: 3
depends_on:
kafka:
condition: service_healthy
mysql:
condition: service_healthy

wait_for_api:
image: alpine:latest
container_name: wait_for_api
depends_on:
public_api:
condition: service_healthy
networks:
botdetector-network:
2 changes: 1 addition & 1 deletion src/api/v2/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ async def post_reports(detections: list[Detection]):
if not data:
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="invalid data")
logger.debug(f"Working: {len(data)}")
await report.send_to_kafka(data)
await report.send_to_kafka(data=data)
return Ok()
12 changes: 10 additions & 2 deletions src/app/repositories/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import time

from src.app.views.input.report import Detection
from src.app.views.input.report import Detection, KafkaDetectionV1
from src.core.fastapi.dependencies import kafka_engine

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,8 +60,16 @@ async def parse_data(self, data: list[Detection]) -> list[Detection] | None:
return None
return data

def detection_to_v1(self, data: list[Detection]) -> list[KafkaDetectionV1]:
logger.debug(f"received: {len(data)}")
_data = [KafkaDetectionV1(**d.model_dump()) for d in data]
logger.debug(f"received: {len(_data)}")
return _data

async def send_to_kafka(self, data: list[Detection]) -> None:
detections = [d.model_dump(mode="json") for d in data]
data_vx = self.detection_to_v1(data=data)
assert len(data) == len(data_vx)
detections = [d.model_dump(mode="json") for d in data_vx]
send_queue = kafka_engine.producer.get_queue()
await asyncio.gather(*[send_queue.put(d) for d in detections])
return
5 changes: 5 additions & 0 deletions src/app/views/input/_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pydantic import BaseModel


class Metadata(BaseModel):
version: str
20 changes: 19 additions & 1 deletion src/app/views/input/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pydantic import BaseModel
from pydantic.fields import Field

from src.app.views.input._metadata import Metadata


class Equipment(BaseModel):
equip_head_id: Optional[int] = Field(None, ge=0)
Expand All @@ -17,7 +19,7 @@ class Equipment(BaseModel):
equip_shield_id: Optional[int] = Field(None, ge=0)


class Detection(BaseModel):
class BaseDetection(BaseModel):
reporter: str = Field(..., min_length=1, max_length=13)
reported: str = Field(..., min_length=1, max_length=12)
region_id: int = Field(0, ge=0, le=100_000)
Expand All @@ -31,3 +33,19 @@ class Detection(BaseModel):
world_number: int = Field(0, ge=300, le=1_000)
equipment: Equipment
equip_ge_value: int = Field(0, ge=0)


class Detection(BaseDetection):
reporter: str = Field(..., min_length=1, max_length=13)
reported: str = Field(..., min_length=1, max_length=12)


class ParsedDetection(BaseDetection):
reporter_id: int = Field(..., ge=0)
reported_id: int = Field(..., ge=0)


class KafkaDetectionV1(BaseDetection):
metadata: Metadata = Metadata(version="v1.0.0")
reporter: str = Field(..., min_length=1, max_length=13)
reported: str = Field(..., min_length=1, max_length=12)

0 comments on commit b84d128

Please sign in to comment.