Skip to content

Commit

Permalink
aiokafka inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
extreme4all committed May 26, 2024
1 parent 974306d commit 1eec8ee
Show file tree
Hide file tree
Showing 16 changed files with 353 additions and 545 deletions.
2 changes: 1 addition & 1 deletion kafka_setup/.dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
kafka_data/*.json
kafka_data/*.json
2 changes: 1 addition & 1 deletion kafka_setup/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ COPY . .
# Set the KAFKA_BROKER environment variable during container runtime
ENV KAFKA_BROKER=localhost:9094

CMD ["python", "setup_kafka.py"]
CMD ["python", "setup_kafka.py"]
86 changes: 0 additions & 86 deletions kafka_setup/generate_example_data.py

This file was deleted.

Empty file removed kafka_setup/kafka_data/.gitkeep
Empty file.
Binary file removed kafka_setup/kafka_data/kafka_data.zip
Binary file not shown.
Empty file.
2 changes: 2 additions & 0 deletions kafka_setup/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
kafka-python
pydantic
faker
150 changes: 64 additions & 86 deletions kafka_setup/setup_kafka.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# setup_kafka.py
import json
import os
import zipfile
from queue import Queue
import random

from faker import Faker
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic

Expand All @@ -23,16 +22,6 @@ def create_topics():

res = admin_client.create_topics(
[
NewTopic(
name="player",
num_partitions=3,
replication_factor=1,
),
NewTopic(
name="scraper",
num_partitions=4,
replication_factor=1,
),
NewTopic(
name="report",
num_partitions=4,
Expand All @@ -48,88 +37,77 @@ def create_topics():
return


def extract_zip(extract_to: str):
current_dir = "./kafka_data" # Get the current working directory
print(os.listdir(current_dir))
# Find zip file in the current directory
zip_files = [file for file in os.listdir(current_dir) if file.endswith(".zip")]
example = {
"reporter": "player1",
"reported": "player2",
"region_id": 14652,
"x_coord": 3682,
"y_coord": 3851,
"z_coord": 0,
"ts": 1704223737,
"manual_detect": 0,
"on_members_world": 1,
"on_pvp_world": 0,
"world_number": 324,
"equipment": {
"equip_head_id": 13592,
"equip_amulet_id": None,
"equip_torso_id": 13596,
"equip_legs_id": 13598,
"equip_boots_id": 13602,
"equip_cape_id": 13594,
"equip_hands_id": 13600,
"equip_weapon_id": 1381,
"equip_shield_id": None,
},
"equip_ge_value": 0,
}

if not zip_files:
print("No zip file found in the current directory")
return

# Select the first zip file found
for zip_file in zip_files:
print(f"extracting: {zip_file}")
zip_file_path = os.path.join(current_dir, zip_file)
# Create the extraction directory if it doesn't exist
if not os.path.exists(extract_to):
os.makedirs(extract_to)

# Extract zipfile
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(extract_to)
return


def get_messages_from_json(path: str, send_queue: Queue):
paths = []
for file_name in os.listdir(path):
print(f"{file_name=}")
if file_name.endswith(".json"):
file_path = os.path.join(path, file_name)
paths.append(file_path)

for _path in paths:
print(f"{_path=}")
with open(_path) as file:
data = json.load(file)
print(f"{_path}:{len(data)}")
_ = [send_queue.put(item=d) for d in data]
return


def kafka_producer():
def insert_data():
# Get the Kafka broker address from the environment variable
kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094")

# Create the Kafka producer
producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda x: json.dumps(x).encode(),
)
return producer


def send_messages(producer: KafkaProducer, send_queue: Queue, topic: str = "scraper"):
while True:
if send_queue.empty():
break

if send_queue.qsize() % 100 == 0:
print(f"{send_queue.qsize()=}")
message = send_queue.get()
print(message)
producer.send(topic=topic, value=message)
send_queue.task_done()


def insert_data():
send_queue = Queue()
extract_to = "kafka_data"
producer = kafka_producer()

print("extract_zip")
extract_zip(extract_to)
print("get_messages_from_json")
get_messages_from_json(extract_to, send_queue=send_queue)
print("send_messages")
send_messages(producer=producer, send_queue=send_queue, topic="report")


def main():
print("create_topics")
# Generate fixed players with the same names and scores
len_messages = 100_000
players = [f"player{i}" for i in range(500)]
faker = Faker()
for i in range(len_messages):
msg = {
"reporter": random.choice(players),
"reported": random.choice(players),
"region_id": random.randint(10_000, 10_500),
"x_coord": random.randint(0, 5000),
"y_coord": random.randint(0, 5000),
"z_coord": random.randint(0, 3),
"ts": int(faker.date_time().timestamp()),
"manual_detect": random.choice([0, 1]),
"on_members_world": random.choice([0, 1]),
"on_pvp_world": random.choice([0, 1]),
"world_number": random.randint(300, 500),
"equipment": {
k: random.choice(
[None, *[random.randint(0, 20000) for _ in range(100)]]
)
for k in example["equipment"].keys()
},
"equip_ge_value": 0,
}
producer.send(topic="report", value=msg)
print(i, msg)
print("Data insertion completed.")


def setup_kafka():
create_topics()
print("insert_data")
insert_data()
print("done")


main()
if __name__ == "__main__":
setup_kafka()
69 changes: 54 additions & 15 deletions mysql/docker-entrypoint-initdb.d/01_tables.sql
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
USE playerdata;

CREATE TABLE Players (
id INT PRIMARY KEY AUTO_INCREMENT,
name TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP,
possible_ban BOOLEAN,
confirmed_ban BOOLEAN,
confirmed_player BOOLEAN,
label_id INTEGER,
label_jagex INTEGER,
ironman BOOLEAN,
hardcore_ironman BOOLEAN,
ultimate_ironman BOOLEAN,
normalized_name TEXT
CREATE TABLE `Players` (
`id` int NOT NULL AUTO_INCREMENT,
`name` text NOT NULL,
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` datetime DEFAULT NULL,
`possible_ban` tinyint(1) NOT NULL DEFAULT '0',
`confirmed_ban` tinyint(1) NOT NULL DEFAULT '0',
`confirmed_player` tinyint(1) NOT NULL DEFAULT '0',
`label_id` int NOT NULL DEFAULT '0',
`label_jagex` int NOT NULL DEFAULT '0',
`ironman` tinyint DEFAULT NULL,
`hardcore_ironman` tinyint DEFAULT NULL,
`ultimate_ironman` tinyint DEFAULT NULL,
`normalized_name` text,
PRIMARY KEY (`id`),
UNIQUE KEY `Unique_name` (`name`(50)),
KEY `FK_label_id` (`label_id`),
KEY `confirmed_ban_idx` (`confirmed_ban`),
KEY `normal_name_index` (`normalized_name`(50)),
KEY `Players_label_jagex_IDX` (`label_jagex`) USING BTREE,
KEY `Players_possible_ban_IDX` (`possible_ban`,`confirmed_ban`) USING BTREE
);

-- Insert into
Expand Down Expand Up @@ -43,4 +50,36 @@ CREATE TABLE `stgReports` (
`equip_ge_value` bigint DEFAULT NULL,
PRIMARY KEY (`ID`)
)
;
;
CREATE TABLE `Reports` (
`ID` bigint NOT NULL AUTO_INCREMENT,
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`reportedID` int NOT NULL,
`reportingID` int NOT NULL,
`region_id` int NOT NULL,
`x_coord` int NOT NULL,
`y_coord` int NOT NULL,
`z_coord` int NOT NULL,
`timestamp` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`manual_detect` tinyint(1) DEFAULT NULL,
`on_members_world` int DEFAULT NULL,
`on_pvp_world` tinyint DEFAULT NULL,
`world_number` int DEFAULT NULL,
`equip_head_id` int DEFAULT NULL,
`equip_amulet_id` int DEFAULT NULL,
`equip_torso_id` int DEFAULT NULL,
`equip_legs_id` int DEFAULT NULL,
`equip_boots_id` int DEFAULT NULL,
`equip_cape_id` int DEFAULT NULL,
`equip_hands_id` int DEFAULT NULL,
`equip_weapon_id` int DEFAULT NULL,
`equip_shield_id` int DEFAULT NULL,
`equip_ge_value` bigint DEFAULT NULL,
PRIMARY KEY (`ID`),
UNIQUE KEY `Unique_Report` (`reportedID`,`reportingID`,`region_id`,`manual_detect`),
KEY `idx_reportingID` (`reportingID`),
KEY `idx_reportedID_regionDI` (`reportedID`,`region_id`),
KEY `idx_heatmap` (`reportedID`,`timestamp`,`region_id`),
CONSTRAINT `FK_Reported_Players_id` FOREIGN KEY (`reportedID`) REFERENCES `Players` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT,
CONSTRAINT `FK_Reporting_Players_id` FOREIGN KEY (`reportingID`) REFERENCES `Players` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT
);
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ SQLAlchemy==2.0.23
tomli==2.0.1
typing_extensions==4.8.0
virtualenv==20.24.6
AiokafkaEngine==0.0.4
Loading

0 comments on commit 1eec8ee

Please sign in to comment.