From 33f49820eefe7fb3dc01232c2d84e4f604f6bee5 Mon Sep 17 00:00:00 2001 From: extreme4all <40169115+extreme4all@users.noreply.github.com> Date: Tue, 16 Jul 2024 23:13:03 +0200 Subject: [PATCH] finetuning --- kafka_setup/src/main.py | 4 +++- src/main.py | 12 ++---------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/kafka_setup/src/main.py b/kafka_setup/src/main.py index 3ef023e..ef09809 100644 --- a/kafka_setup/src/main.py +++ b/kafka_setup/src/main.py @@ -69,6 +69,7 @@ def send_data(producer: KafkaProducer): with_metadata = random.choice([0, 1]) if with_metadata == 0: msg.pop("metadata") + print(i, msg.get("metadata"), msg["reporter"], msg["reported"]) elif version == "v2.0.0": v2 = { "metadata": {"version": "v2.0.0"}, @@ -76,9 +77,10 @@ def send_data(producer: KafkaProducer): "reported_id": random.choice(players).replace("player", ""), } msg.update(v2) + print(i, msg.get("metadata"), msg["reporter_id"], msg["reported_id"]) assert "reporter" in msg or "reported_id" in msg producer.send(topic="report", value=msg) - print(i, msg["metadata"], msg["reporting_id"], msg["reported_id"]) + print("Data insertion completed.") diff --git a/src/main.py b/src/main.py index e794431..f17f325 100644 --- a/src/main.py +++ b/src/main.py @@ -92,7 +92,7 @@ async def queue_to_batch(queue: Queue, max_len: int = None) -> list: async def insert_batch(valid_report_queue: Queue): - INSERT_INTERVAL_SEC = 20 + INSERT_INTERVAL_SEC = 60 last_time = time.time() batch = [] while True: @@ -221,7 +221,6 @@ async def process_data(report_queue: Queue, player_cache: SimpleALRUCache): async def main(): report_queue = Queue(maxsize=500) - valid_report_queue = Queue() await producer.start_engine(topic="report") await consumer.start_engine(topics=["report"]) @@ -234,14 +233,7 @@ async def main(): player_cache=player_cache, ) ) - asyncio.create_task( - check_duplicate_report( - report_queue=report_queue, - valid_report_queue=valid_report_queue, - skip=True, - ) - ) - asyncio.create_task(insert_batch(valid_report_queue=valid_report_queue)) + asyncio.create_task(insert_batch(valid_report_queue=report_queue)) while True: await asyncio.sleep(60)