Skip to content

Commit

Permalink
add: websocket server
Browse files Browse the repository at this point in the history
  • Loading branch information
MagicTheDev committed Apr 11, 2024
1 parent 1a115ed commit ad6a252
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
67 changes: 67 additions & 0 deletions bot/websockets/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import ujson
from aiokafka import AIOKafkaConsumer
from uvicorn import Config, Server
from fastapi import FastAPI, WebSocket
from collections import defaultdict
import asyncio
from loguru import logger


EVENT_CLIENTS = set()
CLAN_MAP = defaultdict(set)
app = FastAPI()


@app.websocket("/events")
async def event_websocket(websocket: WebSocket):
global EVENT_CLIENTS
global CLAN_MAP
await websocket.accept()
EVENT_CLIENTS.add(websocket)
await websocket.send_text("Successfully Login!")
try:
while True:
data: dict = await websocket.receive_json()
clans = data.get("clans", [])
for clan in clans:
CLAN_MAP[websocket.client].add(clan)
except Exception:
EVENT_CLIENTS.remove(websocket)


async def broadcast():
global EVENT_CLIENTS
global CLAN_MAP
async def send_ws(ws, json):
try:
await ws.send_json(json)
except Exception as e:
logger.error(e)
EVENT_CLIENTS.discard(ws)


topics = ["clan", "player", "war", "capital", "reminder", "reddit"]
consumer: AIOKafkaConsumer = AIOKafkaConsumer(*topics, bootstrap_servers='85.10.200.219:9092', auto_offset_reset="latest")
await consumer.start()
logger.info("Events Started")
async for msg in consumer:
message_to_send = {
"topic" : msg.topic,
"value" : ujson.loads(msg.value)
}

key = msg.key.decode("utf-8") if msg.key is not None else None
tasks = []
for client in EVENT_CLIENTS.copy(): #type: WebSocket
clans = CLAN_MAP.get(client.client, [])
if key in clans or key is None:
tasks.append(send_ws(ws=client, json=message_to_send))
await asyncio.gather(*tasks)


async def main():
loop = asyncio.get_event_loop()
config = Config(app=app, loop="asyncio", host="localhost", port=8000, ws_ping_interval=120 ,ws_ping_timeout= 120)
server = Server(config)
loop.create_task(server.serve())
loop.create_task(broadcast())
3 changes: 3 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from bot.clan.track import main as bot_clan_main
from bot.legends.track import main as bot_legend_main
from bot.reddit.track import main as bot_reddit_main
from bot.websockets.server import main as websocket_server

from gamewide.clan_verify.track import main as clan_verify_main
from gamewide.players.track import broadcast as global_player_main
Expand Down Expand Up @@ -32,6 +33,8 @@
task = global_war_main
elif config.tracking_type == "SEARCH":
task = search_main
elif config.tracking_type == "WEBSOCKET":
task = websocket_server
else:
task = asyncio.sleep

Expand Down

0 comments on commit ad6a252

Please sign in to comment.