Skip to content

Commit

Permalink
Merge pull request #11 from Bot-detector/add-msg-version
Browse files Browse the repository at this point in the history
Add msg version
  • Loading branch information
extreme4all authored Jun 29, 2024
2 parents 932347f + 389f471 commit 9dd630f
Show file tree
Hide file tree
Showing 22 changed files with 380 additions and 255 deletions.
6 changes: 0 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,3 @@ repos:
rev: 22.10.0
hooks:
- id: black
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
name: isort (python)
args: ["--profile", "black"]
12 changes: 1 addition & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ export PRINT_HELP_PYSCRIPT
help:
@python3 -c "$$PRINT_HELP_PYSCRIPT" < $(MAKEFILE_LIST)

pre-commit-setup: ## Install pre-commit
python3 -m pip install pre-commit
pre-commit --version

pre-commit: ## Run pre-commit
pre-commit run --all-files

create-venv:
python3 -m venv .venv
source .venv/bin/activate
Expand All @@ -43,9 +36,6 @@ docker-test:
docker compose up --build -d
pytest

api-setup:
python3 -m pip install "fastapi[all]"

env-setup:
touch .env
echo "KAFKA_HOST= 'localhost:9092'" >> .env
Expand All @@ -57,4 +47,4 @@ env-setup:
docs:
open http://localhost:5000/docs
xdg-open http://localhost:5000/docs
. http://localhost:5000/docs
. http://localhost:5000/docs
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ services:
image: bot-detector/kafka_setup
build:
context: ./kafka_setup
command: ["python", "setup_kafka.py"]
# command: bash -c "sleep infinity"
# command: ["python", "main.py"]
environment:
- KAFKA_BROKER=kafka:9092
networks:
Expand Down
2 changes: 1 addition & 1 deletion kafka_setup/.dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
kafka_data/*.json
kafka_data/*.json
4 changes: 2 additions & 2 deletions kafka_setup/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
COPY /src .

# Set the KAFKA_BROKER environment variable during container runtime
ENV KAFKA_BROKER=localhost:9094

CMD ["python", "setup_kafka.py"]
CMD ["python", "main.py"]
2 changes: 1 addition & 1 deletion kafka_setup/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kafka-python
pydantic
faker
faker
113 changes: 0 additions & 113 deletions kafka_setup/setup_kafka.py

This file was deleted.

33 changes: 33 additions & 0 deletions kafka_setup/src/_kafka_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os

from kafka.admin import KafkaAdminClient, NewTopic


def create_topics():
# Get the Kafka broker address from the environment variable
kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094")

# Create Kafka topics
admin_client = KafkaAdminClient(bootstrap_servers=kafka_broker)

topics = admin_client.list_topics()
print("existing topics", topics)

if not topics == []:
admin_client.delete_topics(topics)

res = admin_client.create_topics(
[
NewTopic(
name="report",
num_partitions=4,
replication_factor=1,
),
]
)

print("created_topic", res)

topics = admin_client.list_topics()
print("all topics", topics)
return
88 changes: 88 additions & 0 deletions kafka_setup/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import json
import os
import random

import _kafka_config
from faker import Faker
from kafka import KafkaProducer


def send_data(producer: KafkaProducer):
example = {
"metadata": {"version": "v1.0.0"},
"reporter": "player1",
"reported": "player2",
"region_id": 14652,
"x_coord": 3682,
"y_coord": 3851,
"z_coord": 0,
"ts": 1704223737,
"manual_detect": 0,
"on_members_world": 1,
"on_pvp_world": 0,
"world_number": 324,
"equipment": {
"equip_head_id": 13592,
"equip_amulet_id": None,
"equip_torso_id": 13596,
"equip_legs_id": 13598,
"equip_boots_id": 13602,
"equip_cape_id": 13594,
"equip_hands_id": 13600,
"equip_weapon_id": 1381,
"equip_shield_id": None,
},
"equip_ge_value": 0,
}

# Generate fixed players with the same names and scores
len_messages = 100_000
players = [f"player{i}" for i in range(500)]
faker = Faker()
for i in range(len_messages):
msg = {
"metadata": {"version": "v1.0.0"},
"reporter": random.choice(players),
"reported": random.choice(players),
"region_id": random.randint(10_000, 10_500),
"x_coord": random.randint(0, 5000),
"y_coord": random.randint(0, 5000),
"z_coord": random.randint(0, 3),
"ts": int(faker.date_time().timestamp()),
"manual_detect": random.choice([0, 1]),
"on_members_world": random.choice([0, 1]),
"on_pvp_world": random.choice([0, 1]),
"world_number": random.randint(300, 500),
"equipment": {
k: random.choice(
[None, *[random.randint(0, 20000) for _ in range(100)]]
)
for k in example["equipment"].keys()
},
"equip_ge_value": 0,
}

with_metadata = random.choice([0, 1])
if with_metadata == 0:
msg.pop("metadata")

producer.send(topic="report", value=msg)
print(i, msg)
print("Data insertion completed.")


def main():
_kafka_config.create_topics()
# Get the Kafka broker address from the environment variable
kafka_broker = os.environ.get("KAFKA_BROKER", "localhost:9094")

# Create the Kafka producer
producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda x: json.dumps(x).encode(),
)
send_data(producer=producer)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion mysql/docker-entrypoint-initdb.d/01_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ CREATE TABLE `Reports` (
KEY `idx_heatmap` (`reportedID`,`timestamp`,`region_id`),
CONSTRAINT `FK_Reported_Players_id` FOREIGN KEY (`reportedID`) REFERENCES `Players` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT,
CONSTRAINT `FK_Reporting_Players_id` FOREIGN KEY (`reportingID`) REFERENCES `Players` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT
);
);
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ SQLAlchemy==2.0.30
tomli==2.0.1
typing_extensions==4.12.0
virtualenv==20.26.2
async_lru==2.0.4
async_lru==2.0.4
63 changes: 63 additions & 0 deletions src/_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio
import logging
from collections import OrderedDict

logger = logging.getLogger(__name__)


class SimpleALRUCache:
def __init__(self, max_size=128):
self.cache = OrderedDict()
self.max_size = max_size
self.lock = asyncio.Lock()
self.hits: int = 0
self.misses: int = 0

async def get(self, key):
async with self.lock:
if key in self.cache:
# Move the accessed key to the end to mark it as recently used
self.cache.move_to_end(key)
self.hits += 1
return self.cache[key]
self.misses += 1
return None

async def put(self, key, value):
async with self.lock:
if key in self.cache:
# Update the value and mark it as recently used
self.cache.move_to_end(key)
self.cache[key] = value
else:
# If the cache is full, remove the first (least recently used) item
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[key] = value

async def clear(self):
async with self.lock:
self.cache.clear()


# Example usage
async def main():
cache = SimpleALRUCache(max_size=3)

await cache.put("a", 1)
await cache.put("b", 2)
await cache.put("c", 3)

print(await cache.get("a")) # Output: 1
print(await cache.get("b")) # Output: 2
print(await cache.get("c")) # Output: 3

await cache.put("d", 4) # This will evict 'a' because it's the LRU item

print(await cache.get("a")) # Output: None, 'a' has been evicted
print(await cache.get("d")) # Output: 4


if __name__ == "__main__":
# Run the example
asyncio.run(main())
1 change: 1 addition & 0 deletions src/_kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from AioKafkaEngine import ConsumerEngine, ProducerEngine

from core.config import settings

consumer = ConsumerEngine(
Expand Down
Loading

0 comments on commit 9dd630f

Please sign in to comment.