diff --git a/kafka_setup/generate_example_data.py b/kafka_setup/generate_example_data.py index 1977ff7..539aa50 100644 --- a/kafka_setup/generate_example_data.py +++ b/kafka_setup/generate_example_data.py @@ -1,5 +1,6 @@ import asyncio import json +import uuid from asyncio import Queue from aiokafka import AIOKafkaConsumer @@ -39,26 +40,22 @@ def save(data, file_name): # Asynchronous function to generate example data from the queue and save it as a JSON file async def generate_example_data(queue: Queue): MAX_LEN = 1000 - count = 0 + count, empty_count = 0, 0 data = [] while True: + if queue.empty(): + await asyncio.sleep(1) + empty_count += 1 + if empty_count > 10: + break + continue + item: dict = await queue.get() + print(item) if item is None or count >= MAX_LEN: break - player = item.get("player") - highscore = item.get("hiscores") - - if player: - # Add an 'id' to the player dictionary - item["player"]["id"] = count - item["player"]["name"] = f"Player{count}" - - if highscore: - # Add 'Player_id' to the hiscores dictionary - item["hiscores"]["Player_id"] = count - data.append(item) if count % 100 == 0: @@ -73,7 +70,8 @@ async def generate_example_data(queue: Queue): # Asynchronous main function coordinating the whole process async def main(): receive_queue = Queue(maxsize=100) - consumer = await kafka_consumer(topic="scraper", group="test") + print("group", f"test-{uuid.uuid4()}") + consumer = await kafka_consumer(topic="report", group=f"test-{uuid.uuid4()}") asyncio.create_task( receive_messages(consumer=consumer, receive_queue=receive_queue) diff --git a/kafka_setup/kafka_data/kafka_data.zip b/kafka_setup/kafka_data/kafka_data.zip deleted file mode 100644 index 7f9dac7..0000000 Binary files a/kafka_setup/kafka_data/kafka_data.zip and /dev/null differ diff --git a/kafka_setup/kafka_data/kafka_data/kafka_data.json b/kafka_setup/kafka_data/kafka_data/kafka_data.json new file mode 100644 index 0000000..c9d763b --- /dev/null +++ b/kafka_setup/kafka_data/kafka_data/kafka_data.json @@ -0,0 +1,877 @@ +[ + { + "reporter": "Cyborger1", + "reported": "Shpinki", + "region_id": 14652, + "x_coord": 3682, + "y_coord": 3851, + "z_coord": 0, + "ts": 1704223737, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 13592, + "equip_amulet_id": null, + "equip_torso_id": 13596, + "equip_legs_id": 13598, + "equip_boots_id": 13602, + "equip_cape_id": 13594, + "equip_hands_id": 13600, + "equip_weapon_id": 1381, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Loveskippy", + "region_id": 14652, + "x_coord": 3685, + "y_coord": 3870, + "z_coord": 0, + "ts": 1704223743, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 27446, + "equip_amulet_id": 10132, + "equip_torso_id": 27452, + "equip_legs_id": 27455, + "equip_boots_id": 27461, + "equip_cape_id": 27449, + "equip_hands_id": 27458, + "equip_weapon_id": 7409, + "equip_shield_id": 13660 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Loveskippy", + "region_id": 14652, + "x_coord": 3666, + "y_coord": 3866, + "z_coord": 0, + "ts": 1704223777, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 27446, + "equip_amulet_id": 10132, + "equip_torso_id": 27452, + "equip_legs_id": 27455, + "equip_boots_id": 27461, + "equip_cape_id": 27449, + "equip_hands_id": 27458, + "equip_weapon_id": 7409, + "equip_shield_id": 13660 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Spooned Tuna", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704224138, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": null, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": null, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "IM Hale", + "region_id": 4922, + "x_coord": 1249, + "y_coord": 3758, + "z_coord": 0, + "ts": 1704224088, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": null, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": 9810, + "equip_hands_id": null, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "NewdogsNtown", + "region_id": 12850, + "x_coord": 3232, + "y_coord": 3211, + "z_coord": 0, + "ts": 1704224293, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": 1710, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": null, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Smooth Biggs", + "region_id": 12851, + "x_coord": 3214, + "y_coord": 3274, + "z_coord": 0, + "ts": 1704224470, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 1167, + "equip_amulet_id": 1725, + "equip_torso_id": 1135, + "equip_legs_id": 1099, + "equip_boots_id": 4097, + "equip_cape_id": 10498, + "equip_hands_id": 1065, + "equip_weapon_id": 8880, + "equip_shield_id": 4156 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "SimpinShrimp", + "region_id": 12595, + "x_coord": 3180, + "y_coord": 3288, + "z_coord": 0, + "ts": 1704225257, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 1147, + "equip_amulet_id": 552, + "equip_torso_id": 1135, + "equip_legs_id": 1099, + "equip_boots_id": 1061, + "equip_cape_id": 10499, + "equip_hands_id": 1065, + "equip_weapon_id": 853, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Shpinki", + "region_id": 14651, + "x_coord": 3682, + "y_coord": 3837, + "z_coord": 0, + "ts": 1704223741, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 13592, + "equip_amulet_id": null, + "equip_torso_id": 13596, + "equip_legs_id": 13598, + "equip_boots_id": 13602, + "equip_cape_id": 13594, + "equip_hands_id": 13600, + "equip_weapon_id": 1381, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Enkanooda", + "region_id": 14652, + "x_coord": 3687, + "y_coord": 3861, + "z_coord": 0, + "ts": 1704223741, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 25071, + "equip_amulet_id": null, + "equip_torso_id": 25077, + "equip_legs_id": 25080, + "equip_boots_id": 25086, + "equip_cape_id": 13123, + "equip_hands_id": 25083, + "equip_weapon_id": 7409, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "GIM Merkthon", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704223832, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 13592, + "equip_amulet_id": 1704, + "equip_torso_id": 13596, + "equip_legs_id": 13598, + "equip_boots_id": 13602, + "equip_cape_id": 13594, + "equip_hands_id": 13600, + "equip_weapon_id": 772, + "equip_shield_id": 12954 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Spooned Tuna", + "region_id": 4922, + "x_coord": 1249, + "y_coord": 3739, + "z_coord": 0, + "ts": 1704224027, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": null, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": null, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "GIM Ribet", + "region_id": 4922, + "x_coord": 1248, + "y_coord": 3728, + "z_coord": 0, + "ts": 1704224027, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 11851, + "equip_amulet_id": null, + "equip_torso_id": 11855, + "equip_legs_id": 11857, + "equip_boots_id": 11861, + "equip_cape_id": 9810, + "equip_hands_id": 11859, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Shpinki", + "region_id": 4922, + "x_coord": 1247, + "y_coord": 3727, + "z_coord": 0, + "ts": 1704224027, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 13592, + "equip_amulet_id": null, + "equip_torso_id": 13596, + "equip_legs_id": 13598, + "equip_boots_id": 13602, + "equip_cape_id": 13594, + "equip_hands_id": 13600, + "equip_weapon_id": 1381, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "TittyJugs", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704224064, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 11851, + "equip_amulet_id": 11968, + "equip_torso_id": 11855, + "equip_legs_id": 11857, + "equip_boots_id": 11861, + "equip_cape_id": 11853, + "equip_hands_id": 11859, + "equip_weapon_id": 7409, + "equip_shield_id": 13660 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Enkanooda", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704224138, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 25071, + "equip_amulet_id": null, + "equip_torso_id": 25077, + "equip_legs_id": 25080, + "equip_boots_id": 25086, + "equip_cape_id": 13123, + "equip_hands_id": 25083, + "equip_weapon_id": 7409, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "V1BER", + "region_id": 4922, + "x_coord": 1249, + "y_coord": 3758, + "z_coord": 0, + "ts": 1704224088, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 11865, + "equip_amulet_id": 19547, + "equip_torso_id": 27238, + "equip_legs_id": 27241, + "equip_boots_id": 13237, + "equip_cape_id": 22109, + "equip_hands_id": 7462, + "equip_weapon_id": 20997, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "TheTavar", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704224138, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 13580, + "equip_amulet_id": null, + "equip_torso_id": 13584, + "equip_legs_id": 13586, + "equip_boots_id": 13590, + "equip_cape_id": 13582, + "equip_hands_id": 13588, + "equip_weapon_id": 20730, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "ToxicDigiRat", + "region_id": 12850, + "x_coord": 3225, + "y_coord": 3215, + "z_coord": 0, + "ts": 1704224293, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": null, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": null, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "NineBoots", + "region_id": 12850, + "x_coord": 3222, + "y_coord": 3219, + "z_coord": 0, + "ts": 1704224293, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 1017, + "equip_amulet_id": 1478, + "equip_torso_id": 581, + "equip_legs_id": 428, + "equip_boots_id": 89, + "equip_cape_id": 11853, + "equip_hands_id": 1059, + "equip_weapon_id": 1387, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Kendra Thoma", + "region_id": 12850, + "x_coord": 3220, + "y_coord": 3262, + "z_coord": 0, + "ts": 1704224310, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": null, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": null, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Theboss12237", + "region_id": 12595, + "x_coord": 3180, + "y_coord": 3296, + "z_coord": 0, + "ts": 1704224432, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": 1706, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": 11972, + "equip_weapon_id": 6317, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Proper Bot", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704223832, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 13647, + "equip_amulet_id": 11111, + "equip_torso_id": 13643, + "equip_legs_id": 13641, + "equip_boots_id": 13645, + "equip_cape_id": 9799, + "equip_hands_id": 11859, + "equip_weapon_id": 9084, + "equip_shield_id": 13660 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "DonaldDegen", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704224027, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 10828, + "equip_amulet_id": 1725, + "equip_torso_id": 1127, + "equip_legs_id": 1079, + "equip_boots_id": 3105, + "equip_cape_id": 13121, + "equip_hands_id": 7462, + "equip_weapon_id": 4587, + "equip_shield_id": 12954 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "GIM Waifu", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704224138, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 10828, + "equip_amulet_id": 1704, + "equip_torso_id": 2503, + "equip_legs_id": 2497, + "equip_boots_id": 3105, + "equip_cape_id": 6570, + "equip_hands_id": 7462, + "equip_weapon_id": 4587, + "equip_shield_id": 12954 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "GIM Ribet", + "region_id": 4922, + "x_coord": 1247, + "y_coord": 3737, + "z_coord": 0, + "ts": 1704224146, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 11851, + "equip_amulet_id": null, + "equip_torso_id": 11855, + "equip_legs_id": 11857, + "equip_boots_id": 11861, + "equip_cape_id": 9810, + "equip_hands_id": 11859, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Goated12722", + "region_id": 12595, + "x_coord": 3183, + "y_coord": 3296, + "z_coord": 0, + "ts": 1704224519, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": 1706, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": 11972, + "equip_weapon_id": 6317, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Theboss12237", + "region_id": 12595, + "x_coord": 3183, + "y_coord": 3292, + "z_coord": 0, + "ts": 1704224746, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": 1706, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": 11972, + "equip_weapon_id": 6317, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "SnowGrind", + "region_id": 12851, + "x_coord": 3209, + "y_coord": 3278, + "z_coord": 0, + "ts": 1704224770, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 11865, + "equip_amulet_id": 1725, + "equip_torso_id": 544, + "equip_legs_id": 542, + "equip_boots_id": 11840, + "equip_cape_id": 22114, + "equip_hands_id": 22981, + "equip_weapon_id": 27660, + "equip_shield_id": 12954 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "GIM Waifu", + "region_id": 4922, + "x_coord": 1253, + "y_coord": 3741, + "z_coord": 0, + "ts": 1704224027, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 10828, + "equip_amulet_id": 1704, + "equip_torso_id": 2503, + "equip_legs_id": 2497, + "equip_boots_id": 3105, + "equip_cape_id": 6570, + "equip_hands_id": 7462, + "equip_weapon_id": 4587, + "equip_shield_id": 12954 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "GIM Quark", + "region_id": 11571, + "x_coord": 2936, + "y_coord": 3281, + "z_coord": 0, + "ts": 1704224003, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": null, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": null, + "equip_weapon_id": null, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "bearpoof", + "region_id": 11571, + "x_coord": 2931, + "y_coord": 3286, + "z_coord": 0, + "ts": 1704224024, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 24872, + "equip_amulet_id": 13393, + "equip_torso_id": 24874, + "equip_legs_id": 24876, + "equip_boots_id": 24878, + "equip_cape_id": 9781, + "equip_hands_id": 13676, + "equip_weapon_id": 24880, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "bigd4cilly", + "region_id": 12595, + "x_coord": 3190, + "y_coord": 3274, + "z_coord": 0, + "ts": 1704224319, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 1155, + "equip_amulet_id": null, + "equip_torso_id": 1103, + "equip_legs_id": null, + "equip_boots_id": 1061, + "equip_cape_id": null, + "equip_hands_id": null, + "equip_weapon_id": 1265, + "equip_shield_id": 1189 + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Goated12722", + "region_id": 12595, + "x_coord": 3183, + "y_coord": 3300, + "z_coord": 0, + "ts": 1704224751, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": 1706, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": 11972, + "equip_weapon_id": 6317, + "equip_shield_id": null + }, + "equip_ge_value": 0 + }, + { + "reporter": "Cyborger1", + "reported": "Theboss12237", + "region_id": 12595, + "x_coord": 3174, + "y_coord": 3293, + "z_coord": 0, + "ts": 1704225178, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": null, + "equip_amulet_id": 1706, + "equip_torso_id": null, + "equip_legs_id": null, + "equip_boots_id": null, + "equip_cape_id": null, + "equip_hands_id": 11972, + "equip_weapon_id": 6317, + "equip_shield_id": null + }, + "equip_ge_value": 0 + } +] \ No newline at end of file diff --git a/kafka_setup/kafka_data/kafka_data/kafka_data.zip b/kafka_setup/kafka_data/kafka_data/kafka_data.zip deleted file mode 100644 index 5907418..0000000 Binary files a/kafka_setup/kafka_data/kafka_data/kafka_data.zip and /dev/null differ diff --git a/kafka_setup/setup_kafka.py b/kafka_setup/setup_kafka.py index 72fa57f..8f0faf6 100644 --- a/kafka_setup/setup_kafka.py +++ b/kafka_setup/setup_kafka.py @@ -35,7 +35,7 @@ def create_topics(): replication_factor=1, ), NewTopic( - name="reports", + name="report", num_partitions=4, replication_factor=1, ), @@ -121,7 +121,7 @@ def insert_data(): print("get_messages_from_json") get_messages_from_json(extract_to, send_queue=send_queue) print("send_messages") - send_messages(producer=producer, send_queue=send_queue) + send_messages(producer=producer, send_queue=send_queue, topic="report") def main(): diff --git a/src/app/kafka.py b/src/app/kafka.py new file mode 100644 index 0000000..d9c25c8 --- /dev/null +++ b/src/app/kafka.py @@ -0,0 +1,102 @@ +import asyncio +import json +import logging +import time +from asyncio import Event, Queue + +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer + +logger = logging.getLogger(__name__) + + +def log_speed( + counter: int, start_time: float, _queue: Queue, topic: str, interval: int = 60 +) -> tuple[float, int]: + # Calculate the time elapsed since the function started + delta_time = time.time() - start_time + + # Check if the specified interval has not elapsed yet + if delta_time < interval: + # Return the original start time and the current counter value + return start_time, counter + + # Calculate the processing speed (messages per second) + speed = counter / delta_time + + # Log the processing speed and relevant information + log_message = ( + f"{topic=}, qsize={_queue.qsize()}, " + f"processed {counter} in {delta_time:.2f} seconds, {speed:.2f} msg/sec" + ) + logger.info(log_message) + + # Return the current time and reset the counter to zero + return time.time(), 0 + + +async def kafka_consumer(topic: str, group: str, bootstrap_servers: list[str]): + consumer = AIOKafkaConsumer( + topic, + bootstrap_servers=bootstrap_servers, + group_id=group, + value_deserializer=lambda x: json.loads(x.decode("utf-8")), + auto_offset_reset="earliest", + ) + await consumer.start() + return consumer + + +async def kafka_producer(bootstrap_servers: list[str]): + producer = AIOKafkaProducer( + bootstrap_servers=bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode(), + acks="all", + ) + await producer.start() + return producer + + +async def receive_messages( + consumer: AIOKafkaConsumer, + receive_queue: Queue, + shutdown_event: Event, + batch_size: int = 200, +): + while not shutdown_event.is_set(): + batch = await consumer.getmany(timeout_ms=1000, max_records=batch_size) + for tp, messages in batch.items(): + logger.info(f"Partition {tp}: {len(messages)} messages") + await asyncio.gather(*[receive_queue.put(m.value) for m in messages]) + logger.info("done") + await consumer.commit() + + logger.info("shutdown") + + +async def send_messages( + topic: str, + producer: AIOKafkaProducer, + send_queue: Queue, + shutdown_event: Event, +): + start_time = time.time() + messages_sent = 0 + + while not shutdown_event.is_set(): + start_time, messages_sent = log_speed( + counter=messages_sent, + start_time=start_time, + _queue=send_queue, + topic=topic, + ) + if send_queue.empty(): + await asyncio.sleep(1) + continue + + message = await send_queue.get() + await producer.send(topic, value=message) + send_queue.task_done() + + messages_sent += 1 + + logger.info("shutdown") diff --git a/src/app/schemas/report.py b/src/app/schemas/report.py new file mode 100644 index 0000000..a48ed2f --- /dev/null +++ b/src/app/schemas/report.py @@ -0,0 +1,31 @@ +from typing import Optional + +from pydantic import BaseModel + + +class Equipment(BaseModel): + equip_head_id: Optional[int] + equip_amulet_id: Optional[int] + equip_torso_id: Optional[int] + equip_legs_id: Optional[int] + equip_boots_id: Optional[int] + equip_cape_id: Optional[int] + equip_hands_id: Optional[int] + equip_weapon_id: Optional[int] + equip_shield_id: Optional[int] + + +class Report(BaseModel): + reporter: str + reported: str + region_id: int + x_coord: int + y_coord: int + z_coord: int + ts: int + manual_detect: int + on_members_world: int + on_pvp_world: int + world_number: int + equipment: Equipment + equip_ge_value: int diff --git a/src/database/models/highscores.py b/src/database/models/highscores.py deleted file mode 100644 index ea30503..0000000 --- a/src/database/models/highscores.py +++ /dev/null @@ -1,107 +0,0 @@ -from sqlalchemy import BigInteger, Column, Date, DateTime, Integer, func - -from database.database import Base - - -class PlayerHiscoreData(Base): - __tablename__ = "playerHiscoreData" - - id = Column(Integer, primary_key=True, autoincrement=True) - timestamp = Column(DateTime, nullable=False, server_default=func.now()) - ts_date = Column(Date, nullable=True) - Player_id = Column(Integer, nullable=False) - total = Column(BigInteger, default=0) - attack = Column(Integer, default=0) - defence = Column(Integer, default=0) - strength = Column(Integer, default=0) - hitpoints = Column(Integer, default=0) - ranged = Column(Integer, default=0) - prayer = Column(Integer, default=0) - magic = Column(Integer, default=0) - cooking = Column(Integer, default=0) - woodcutting = Column(Integer, default=0) - fletching = Column(Integer, default=0) - fishing = Column(Integer, default=0) - firemaking = Column(Integer, default=0) - crafting = Column(Integer, default=0) - smithing = Column(Integer, default=0) - mining = Column(Integer, default=0) - herblore = Column(Integer, default=0) - agility = Column(Integer, default=0) - thieving = Column(Integer, default=0) - slayer = Column(Integer, default=0) - farming = Column(Integer, default=0) - runecraft = Column(Integer, default=0) - hunter = Column(Integer, default=0) - construction = Column(Integer, default=0) - league = Column(Integer, default=0) - bounty_hunter_hunter = Column(Integer, default=0) - bounty_hunter_rogue = Column(Integer, default=0) - cs_all = Column(Integer, default=0) - cs_beginner = Column(Integer, default=0) - cs_easy = Column(Integer, default=0) - cs_medium = Column(Integer, default=0) - cs_hard = Column(Integer, default=0) - cs_elite = Column(Integer, default=0) - cs_master = Column(Integer, default=0) - lms_rank = Column(Integer, default=0) - soul_wars_zeal = Column(Integer, default=0) - abyssal_sire = Column(Integer, default=0) - alchemical_hydra = Column(Integer, default=0) - barrows_chests = Column(Integer, default=0) - bryophyta = Column(Integer, default=0) - callisto = Column(Integer, default=0) - cerberus = Column(Integer, default=0) - chambers_of_xeric = Column(Integer, default=0) - chambers_of_xeric_challenge_mode = Column(Integer, default=0) - chaos_elemental = Column(Integer, default=0) - chaos_fanatic = Column(Integer, default=0) - commander_zilyana = Column(Integer, default=0) - corporeal_beast = Column(Integer, default=0) - crazy_archaeologist = Column(Integer, default=0) - dagannoth_prime = Column(Integer, default=0) - dagannoth_rex = Column(Integer, default=0) - dagannoth_supreme = Column(Integer, default=0) - deranged_archaeologist = Column(Integer, default=0) - general_graardor = Column(Integer, default=0) - giant_mole = Column(Integer, default=0) - grotesque_guardians = Column(Integer, default=0) - hespori = Column(Integer, default=0) - kalphite_queen = Column(Integer, default=0) - king_black_dragon = Column(Integer, default=0) - kraken = Column(Integer, default=0) - kreearra = Column(Integer, default=0) - kril_tsutsaroth = Column(Integer, default=0) - mimic = Column(Integer, default=0) - nex = Column(Integer, default=0) - nightmare = Column(Integer, default=0) - phosanis_nightmare = Column(Integer, default=0) - obor = Column(Integer, default=0) - phantom_muspah = Column(Integer, default=0) - sarachnis = Column(Integer, default=0) - scorpia = Column(Integer, default=0) - skotizo = Column(Integer, default=0) - tempoross = Column(Integer, default=0) - the_gauntlet = Column(Integer, default=0) - the_corrupted_gauntlet = Column(Integer, default=0) - theatre_of_blood = Column(Integer, default=0) - theatre_of_blood_hard = Column(Integer, default=0) - thermonuclear_smoke_devil = Column(Integer, default=0) - tombs_of_amascut = Column(Integer, default=0) - tombs_of_amascut_expert = Column(Integer, default=0) - tzkal_zuk = Column(Integer, default=0) - tztok_jad = Column(Integer, default=0) - venenatis = Column(Integer, default=0) - vetion = Column(Integer, default=0) - vorkath = Column(Integer, default=0) - wintertodt = Column(Integer, default=0) - zalcano = Column(Integer, default=0) - zulrah = Column(Integer, default=0) - rifts_closed = Column(Integer, default=0) - artio = Column(Integer, default=0) - calvarion = Column(Integer, default=0) - duke_sucellus = Column(Integer, default=0) - spindel = Column(Integer, default=0) - the_leviathan = Column(Integer, default=0) - the_whisperer = Column(Integer, default=0) - vardorvis = Column(Integer, default=0) diff --git a/src/main.py b/src/main.py index 3d54194..5fb1c0a 100644 --- a/src/main.py +++ b/src/main.py @@ -13,92 +13,16 @@ from sqlalchemy.sql.expression import Insert, Select, Update import core.logging # for log formatting -from app.schemas.highscores import playerHiscoreData as playerHiscoreDataSchema +from app import kafka from core.config import settings from database.database import get_session -from database.models.highscores import PlayerHiscoreData -from database.models.player import Player logger = logging.getLogger(__name__) -async def kafka_consumer(topic: str, group: str): - consumer = AIOKafkaConsumer( - topic, - bootstrap_servers=[settings.KAFKA_HOST], - group_id=group, - value_deserializer=lambda x: json.loads(x.decode("utf-8")), - auto_offset_reset="earliest", - ) - await consumer.start() - return consumer - - -async def kafka_producer(): - producer = AIOKafkaProducer( - bootstrap_servers=[settings.KAFKA_HOST], - value_serializer=lambda v: json.dumps(v).encode(), - acks="all", - ) - await producer.start() - return producer - - -async def receive_messages( - consumer: AIOKafkaConsumer, receive_queue: Queue, error_queue: Queue -): - async for message in consumer: - if error_queue.qsize() > 100: - await asyncio.sleep(1) - continue - value = message.value - await receive_queue.put(value) - - -async def send_messages(topic: str, producer: AIOKafkaProducer, send_queue: Queue): - while True: - if send_queue.empty(): - await asyncio.sleep(1) - continue - message = await send_queue.get() - await producer.send(topic, value=message) - send_queue.task_done() - - # TODO: pydantic data -async def insert_highscore(session: AsyncSession, data: dict): - player_id = data.get("Player_id") - timestamp = data.get("timestamp") - - # Convert the timestamp to a date format - ts_date = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S").date() - - table = PlayerHiscoreData - - select_query: Select = select(table).where( - (table.Player_id == player_id) & (table.ts_date == ts_date) - ) - - # Check if the record exists - existing_record = await session.execute(select_query) - existing_record = existing_record.scalars().first() - - if not existing_record: - data = playerHiscoreDataSchema(**data) - data = data.model_dump(mode="json") - insert_query: Insert = insert(table).values(data).prefix_with("ignore") - await session.execute(insert_query) - - -# TODO: pydantic data -async def update_player(session: AsyncSession, data: dict): - player_id = data.get("id") - - table = Player - query: Update = update(table=table) - query = query.where(Player.id == player_id) - query = query.values(data) - await session.execute(query) +async def insert_report(session: AsyncSession, data: dict): + ... def log_speed( @@ -121,87 +45,61 @@ async def process_data(receive_queue: Queue, error_queue: Queue): # Run indefinitely while True: + start_time, counter = kafka.log_speed( + counter=counter, start_time=start_time, receive_queue=receive_queue + ) # Check if both queues are empty if receive_queue.empty(): - # If there were previous iterations with data, log processing speed - if counter > 0: - start_time, counter = log_speed( - counter=counter, start_time=start_time, receive_queue=receive_queue - ) - # Sleep for 1 second and then continue to the next iteration await asyncio.sleep(1) continue # Get a message from the chosen queue message: dict = await receive_queue.get() - # Extract 'hiscores' and 'player' dictionaries from the message - highscore: dict = message.get("hiscores") - player: dict = message.get("player") - - # Check the environment and filter out certain player IDs if not in production - if settings.ENV != "PRD": - player_id = player.get("id") - MIN_PLAYER_ID = 0 - MAX_PLAYER_ID = 300 - if not (MIN_PLAYER_ID < player_id <= MAX_PLAYER_ID): - continue - try: # Acquire an asynchronous database session session: AsyncSession = await get_session() async with session.begin(): - # If 'highscore' dictionary is present, insert it into the database - if highscore: - await insert_highscore(session=session, data=highscore) - # Update the player information in the database - await update_player(session=session, data=player) - # Commit the changes to the database + await insert_report(session=session, data=message) + # do the insertion await session.commit() # Mark the message as processed in the queue receive_queue.task_done() + # Handle exceptions, log the error, and put the message in the error queue + # Mark the message as processed in the queue and continue to the next iteration except OperationalError as e: await error_queue.put(message) - # Handle exceptions, log the error, and put the message in the error queue logger.error({"error": e}) logger.info(f"error_qsize={error_queue.qsize()}, {message=}") - # Mark the message as processed in the queue and continue to the next iteration receive_queue.task_done() continue except Exception as e: await error_queue.put(message) - # Handle exceptions, log the error, and put the message in the error queue logger.error({"error": e}) logger.debug(f"Traceback: \n{traceback.format_exc()}") logger.info(f"error_qsize={error_queue.qsize()}, {message=}") - # Mark the message as processed in the queue and continue to the next iteration receive_queue.task_done() continue - - # Log processing speed every 100 iterations - if counter % 100 == 0 and counter > 0: - start_time, counter = log_speed( - counter=counter, start_time=start_time, receive_queue=receive_queue - ) - # Increment the counter counter += 1 async def main(): # get kafka engine - consumer = await kafka_consumer(topic="scraper", group="highscore-worker") - producer = await kafka_producer() + consumer = await kafka.kafka_consumer( + topic="report", group="report-worker", bootstrap_servers=[settings.KAFKA_HOST] + ) + producer = await kafka.kafka_producer(bootstrap_servers=[settings.KAFKA_HOST]) receive_queue = Queue(maxsize=100) send_queue = Queue(maxsize=100) asyncio.create_task( - receive_messages( + kafka.receive_messages( consumer=consumer, receive_queue=receive_queue, error_queue=send_queue ) ) asyncio.create_task( - send_messages(topic="scraper", producer=producer, send_queue=send_queue) + kafka.send_messages(topic="scraper", producer=producer, send_queue=send_queue) ) asyncio.create_task( process_data(receive_queue=receive_queue, error_queue=send_queue) diff --git a/tests/test_data.py b/tests/test_data.py new file mode 100644 index 0000000..6739d7d --- /dev/null +++ b/tests/test_data.py @@ -0,0 +1,62 @@ +from typing import Optional + +from pydantic import BaseModel + + +class Equipment(BaseModel): + equip_head_id: int + equip_amulet_id: Optional[int] + equip_torso_id: int + equip_legs_id: int + equip_boots_id: int + equip_cape_id: int + equip_hands_id: int + equip_weapon_id: int + equip_shield_id: Optional[int] + + +class Report(BaseModel): + reporter: str + reported: str + region_id: int + x_coord: int + y_coord: int + z_coord: int + ts: int + manual_detect: int + on_members_world: int + on_pvp_world: int + world_number: int + equipment: Equipment + equip_ge_value: int + + +# Example usage: +json_data = { + "reporter": "Cyborger1", + "reported": "Shpinki", + "region_id": 14651, + "x_coord": 3682, + "y_coord": 3837, + "z_coord": 0, + "ts": 1704223741, + "manual_detect": 0, + "on_members_world": 1, + "on_pvp_world": 0, + "world_number": 324, + "equipment": { + "equip_head_id": 13592, + "equip_amulet_id": None, + "equip_torso_id": 13596, + "equip_legs_id": 13598, + "equip_boots_id": 13602, + "equip_cape_id": 13594, + "equip_hands_id": 13600, + "equip_weapon_id": 1381, + "equip_shield_id": None, + }, + "equip_ge_value": 0, +} + +report_instance = Report(**json_data) +print(report_instance)