From 3adaafd689569b62376f2cc80f58846a445c4a5f Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Mon, 5 Aug 2024 23:24:58 +0200 Subject: [PATCH] put error'ed msg in error queue --- src/app/views/report.py | 4 ++++ src/main.py | 9 ++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/app/views/report.py b/src/app/views/report.py index be33003..31151ff 100644 --- a/src/app/views/report.py +++ b/src/app/views/report.py @@ -45,6 +45,10 @@ class ReportInQV2(BaseReport): reported_id: int +class KafkaReport(ReportInQV2): + metadata: Metadata = Metadata(version="v2.0.0") + + class ReportInQueue(BaseModel): reporter: str reported: str diff --git a/src/main.py b/src/main.py index 09a2c29..b47e1e6 100644 --- a/src/main.py +++ b/src/main.py @@ -9,6 +9,7 @@ from app.controllers.player import PlayerController from app.controllers.report import ReportController from app.views.report import ( + KafkaReport, ReportInQV1, ReportInQV2, StgReportCreate, @@ -52,7 +53,7 @@ async def create_batch(batch_size: int, batch_queue: Queue, report_queue: Queue) _time = time.time() -async def insert_batch(batch_queue: Queue): +async def insert_batch(batch_queue: Queue, error_queue: Queue): while True: if batch_queue.empty(): await asyncio.sleep(1) @@ -69,15 +70,13 @@ async def insert_batch(batch_queue: Queue): await session.commit() logger.debug("inserted") except OperationalError as e: - # await asyncio.gather(*[valid_report_queue.put(msg) for msg in batch]) - # TODO: convert logger.error({"error": e}) + await asyncio.gather(*[error_queue.put(KafkaReport(**m)) for m in batch]) await asyncio.sleep(5) - except Exception as e: - # await asyncio.gather(*[valid_report_queue.put(msg) for msg in batch]) logger.error({"error": e}) logger.debug(f"Traceback: \n{traceback.format_exc()}") + await asyncio.gather(*[error_queue.put(KafkaReport(**m)) for m in batch]) await asyncio.sleep(5)