Skip to content

Commit

Permalink
finetuning
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Jul 16, 2024
1 parent 3c29b69 commit 33f4982
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 11 deletions.
4 changes: 3 additions & 1 deletion kafka_setup/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ def send_data(producer: KafkaProducer):
with_metadata = random.choice([0, 1])
if with_metadata == 0:
msg.pop("metadata")
print(i, msg.get("metadata"), msg["reporter"], msg["reported"])
elif version == "v2.0.0":
v2 = {
"metadata": {"version": "v2.0.0"},
"reporter_id": random.choice(players).replace("player", ""),
"reported_id": random.choice(players).replace("player", ""),
}
msg.update(v2)
print(i, msg.get("metadata"), msg["reporter_id"], msg["reported_id"])
assert "reporter" in msg or "reported_id" in msg
producer.send(topic="report", value=msg)
print(i, msg["metadata"], msg["reporting_id"], msg["reported_id"])

print("Data insertion completed.")


Expand Down
12 changes: 2 additions & 10 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def queue_to_batch(queue: Queue, max_len: int = None) -> list:


async def insert_batch(valid_report_queue: Queue):
INSERT_INTERVAL_SEC = 20
INSERT_INTERVAL_SEC = 60
last_time = time.time()
batch = []
while True:
Expand Down Expand Up @@ -221,7 +221,6 @@ async def process_data(report_queue: Queue, player_cache: SimpleALRUCache):

async def main():
report_queue = Queue(maxsize=500)
valid_report_queue = Queue()
await producer.start_engine(topic="report")
await consumer.start_engine(topics=["report"])

Expand All @@ -234,14 +233,7 @@ async def main():
player_cache=player_cache,
)
)
asyncio.create_task(
check_duplicate_report(
report_queue=report_queue,
valid_report_queue=valid_report_queue,
skip=True,
)
)
asyncio.create_task(insert_batch(valid_report_queue=valid_report_queue))
asyncio.create_task(insert_batch(valid_report_queue=report_queue))

while True:
await asyncio.sleep(60)
Expand Down

0 comments on commit 33f4982

Please sign in to comment.