diff --git a/src/app/controllers/player.py b/src/app/controllers/player.py index 53a4fc7..577b179 100644 --- a/src/app/controllers/player.py +++ b/src/app/controllers/player.py @@ -22,7 +22,7 @@ async def get(self, player_name: str) -> PlayerInDB: async def insert(self, player: PlayerCreate) -> PlayerInDB: player.name = self.sanitize_name(player.name) - sql = sqla.insert(DBPlayer).values(player.model_dump()) + sql = sqla.insert(DBPlayer).values(player.model_dump()).prefix_with("IGNORE") await self.session.execute(sql) return await self.get(player_name=player.name) diff --git a/src/main.py b/src/main.py index 8a701bf..0872b74 100644 --- a/src/main.py +++ b/src/main.py @@ -15,9 +15,7 @@ logger = logging.getLogger(__name__) -async def create_insert_batch(report_queue: Queue): - last_time = time.time() - batch = [] +async def check_duplicate_report(report_queue: Queue, valid_report_queue: Queue): while True: # Check if both queues are empty if report_queue.empty(): @@ -38,20 +36,12 @@ async def create_insert_batch(report_queue: Queue): reporting_id=msg.reportingID, region_id=msg.region_id, ) - if report: - continue - # append to batch - batch.append(msg) - - now = time.time() - if now - last_time > 10: - logger.debug(f"batch inserting: {len(batch)}") - last_time = time.time() - await report_controller.insert(reports=batch) - logger.debug( - f"inserted: {len(batch)}, duration (sec): {int(time.time()-last_time)}" - ) - batch = [] + + # skip duplicate reports + if report: + continue + + await valid_report_queue.put(msg) except OperationalError as e: await report_queue.put(msg) logger.error({"error": e}) @@ -65,6 +55,50 @@ async def create_insert_batch(report_queue: Queue): continue +async def queue_to_batch(queue: Queue, max_len: int = None) -> list: + output = [] + max_len = max_len if max_len else queue.qsize() + for _ in range(max_len): + msg = await queue.get() + queue.task_done() + output.append(msg) + return output + + +async def insert_batch(valid_report_queue: Queue): + last_time = time.time() + while True: + if valid_report_queue.empty(): + await asyncio.sleep(1) + continue + + if time.time() - last_time < 10: + await asyncio.sleep(1) + continue + + try: + # Acquire an asynchronous database session + session: AsyncSession = await get_session() + async with session.begin(): + report_controller = ReportController(session=session) + + batch = await queue_to_batch(queue=valid_report_queue) + logger.debug(f"batch inserting: {len(batch)}") + await report_controller.insert(reports=batch) + last_time = time.time() + + except OperationalError as e: + await asyncio.gather(*[valid_report_queue.put(msg) for msg in batch]) + logger.error({"error": e}) + await asyncio.sleep(5) + + except Exception as e: + await asyncio.gather(*[valid_report_queue.put(msg) for msg in batch]) + logger.error({"error": e}) + logger.debug(f"Traceback: \n{traceback.format_exc()}") + await asyncio.sleep(5) + + async def process_data(report_queue: Queue): receive_queue = consumer.get_queue() error_queue = producer.get_queue() @@ -122,11 +156,19 @@ async def process_data(report_queue: Queue): async def main(): report_queue = Queue(maxsize=500) + valid_report_queue = Queue() await producer.start_engine(topic="report") await consumer.start_engine(topics=["report"]) - asyncio.create_task(process_data(report_queue)) - asyncio.create_task(create_insert_batch(report_queue)) + for _ in range(5): + asyncio.create_task(process_data(report_queue=report_queue)) + asyncio.create_task( + check_duplicate_report( + report_queue=report_queue, valid_report_queue=valid_report_queue + ) + ) + asyncio.create_task(insert_batch(valid_report_queue=valid_report_queue)) + while True: await asyncio.sleep(60)