Skip to content

Commit

Permalink
initial setup
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed Jan 2, 2024
1 parent 404b252 commit 54421f8
Show file tree
Hide file tree
Showing 10 changed files with 1,102 additions and 241 deletions.
26 changes: 12 additions & 14 deletions kafka_setup/generate_example_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import uuid
from asyncio import Queue

from aiokafka import AIOKafkaConsumer
Expand Down Expand Up @@ -39,26 +40,22 @@ def save(data, file_name):
# Asynchronous function to generate example data from the queue and save it as a JSON file
async def generate_example_data(queue: Queue):
MAX_LEN = 1000
count = 0
count, empty_count = 0, 0
data = []
while True:
if queue.empty():
await asyncio.sleep(1)
empty_count += 1
if empty_count > 10:
break
continue

item: dict = await queue.get()
print(item)

if item is None or count >= MAX_LEN:
break

player = item.get("player")
highscore = item.get("hiscores")

if player:
# Add an 'id' to the player dictionary
item["player"]["id"] = count
item["player"]["name"] = f"Player{count}"

if highscore:
# Add 'Player_id' to the hiscores dictionary
item["hiscores"]["Player_id"] = count

data.append(item)

if count % 100 == 0:
Expand All @@ -73,7 +70,8 @@ async def generate_example_data(queue: Queue):
# Asynchronous main function coordinating the whole process
async def main():
receive_queue = Queue(maxsize=100)
consumer = await kafka_consumer(topic="scraper", group="test")
print("group", f"test-{uuid.uuid4()}")
consumer = await kafka_consumer(topic="report", group=f"test-{uuid.uuid4()}")

asyncio.create_task(
receive_messages(consumer=consumer, receive_queue=receive_queue)
Expand Down
Binary file removed kafka_setup/kafka_data/kafka_data.zip
Binary file not shown.
Loading

0 comments on commit 54421f8

Please sign in to comment.