Skip to content

Commit

Permalink
fix: various fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MagicTheDev committed Apr 25, 2024
1 parent d9a6d96 commit 65364e0
Showing 1 changed file with 87 additions and 76 deletions.
163 changes: 87 additions & 76 deletions gamewide/war/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import coc
import pendulum as pend
import random
import ujson

from hashids import Hashids
from datetime import datetime
from msgspec.json import decode
from msgspec import Struct
from pymongo import InsertOne, UpdateOne
from pymongo import InsertOne, UpdateOne
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from typing import List
from utility.classes import MongoDatabase
from .config import GlobalWarTrackingConfig
Expand All @@ -20,7 +20,7 @@

config = GlobalWarTrackingConfig()
db_client = MongoDatabase(stats_db_connection=config.stats_mongodb, static_db_connection=config.static_mongodb)
coc_client = coc.Client(key_count=10, throttle_limit=30, cache_max_size=0, raw_attribute=True, timeout=600)
coc_client = coc.Client(key_count=10, throttle_limit=200, cache_max_size=0, raw_attribute=True, timeout=600)


class Members(Struct):
Expand All @@ -42,8 +42,7 @@ class War(Struct):

store_fails = []


async def broadcast():
async def broadcast(scheduler: AsyncIOScheduler):
global in_war
global store_fails
x = 1
Expand All @@ -53,8 +52,6 @@ async def broadcast():
print(f"{len(list(keys))} keys")
await coc_client.login_with_tokens(*list(keys))

global_tasks = set()

while True:
api_fails = 0

Expand All @@ -70,18 +67,22 @@ async def fetch(url, session: aiohttp.ClientSession, headers, tag, throttler: Th
bot_clan_tags = await db_client.clans_db.distinct("tag")
size_break = 25_000

if x % 50 != 0:
if x % 30 != 0:
right_now = datetime.now().timestamp()
one_week_ago = int(right_now) - (604800 * 4)

try:
clan_tags = await db_client.clan_wars.distinct("clans", filter={"endTime": {"$gte" : one_week_ago}})
except Exception:
pipeline = [{"$match": {"endTime": {"$gte": one_week_ago}}}, {"$group": {"_id": "$clans"}}]
results = await db_client.clan_wars.aggregate(pipeline).to_list(length=None)
clan_tags = []
for result in results:
clan_tags.extend(result.get("_id", []))
pipeline = [
{"$match":
{"$and": [
{"endTime": {"$gte": one_week_ago}},
{"type": {"$ne": "cwl"}}
]
}},
{"$group": {"_id": "$clans"}}]
results = await db_client.clan_wars.aggregate(pipeline).to_list(length=None)
clan_tags = []
for result in results:
clan_tags.extend(result.get("_id", []))

combined_tags = set(clan_tags + bot_clan_tags)
all_tags = list([tag for tag in combined_tags if tag not in in_war])
Expand All @@ -96,13 +97,20 @@ async def fetch(url, session: aiohttp.ClientSession, headers, tag, throttler: Th
all_tags = [all_tags[i:i + size_break] for i in range(0, len(all_tags), size_break)]
ones_that_tried_again = []

timers_alr_captured = set()
if x == 1:
right_now = datetime.now().timestamp()
one_week_ago = int(right_now)
pipeline = [{"$match": {"$and": [
{"endTime": {"$gte": one_week_ago}},
{"data": {"$ne": None}}
]}}, {"$group": {"_id": "$war_id"}}]
results = await db_client.clan_wars.aggregate(pipeline).to_list(length=None)
for result in results:
timers_alr_captured.add(result.get("_id"))

x += 1
for count, tag_group in enumerate(all_tags, 1):

for r_time, tag, opponent_tag, prep_time in global_tasks: #{"run_time" : run_time, "tag" : tag, "opponent_tag" : opponent_tag, "prep_time" : int(war_prep.timestamp())}
if r_time <= pend.now(tz=pend.UTC):
asyncio.create_task(store_war(clan_tag=tag, opponent_tag=opponent_tag, prep_time=prep_time))

logger.info(f"Group {count}/{len(all_tags)}")
tasks = []
connector = aiohttp.TCPConnector(limit=500, ttl_dns_cache=600)
Expand Down Expand Up @@ -139,16 +147,20 @@ async def fetch(url, session: aiohttp.ClientSession, headers, tag, throttler: Th
in_war.add(tag)
in_war.add(opponent_tag)
war_unique_id = "-".join(sorted([war.clan.tag, war.opponent.tag])) + f"-{int(war_prep.timestamp())}"
for member in war.clan.members + war.opponent.members:
war_timers.append(UpdateOne({"_id" : member.tag}, {"$set" : {"clans" : [war.clan.tag, war.opponent.tag], "time" : war_end.time}}, upsert=True))
if war_unique_id not in timers_alr_captured:
for member in war.clan.members + war.opponent.members:
war_timers.append(UpdateOne({"_id" : member.tag}, {"$set" : {"clans" : [war.clan.tag, war.opponent.tag], "time" : war_end.time}}, upsert=True))
changes.append(InsertOne({"war_id" : war_unique_id,
"clans" : [tag, opponent_tag],
"endTime" : int(war_end.time.replace(tzinfo=pend.UTC).timestamp())
}))
#schedule getting war
global_tasks.add((run_time, tag, opponent_tag, int(war_prep.timestamp())))
#await schedule_async(delay, store_war, tag, opponent_tag, int(war_prep.timestamp()))

try:
scheduler.add_job(store_war, 'date', run_date=run_time, args=[tag, opponent_tag, int(war_prep.timestamp())],
id=f"war_end_{tag}_{opponent_tag}", name=f"{tag}_war_end_{opponent_tag}", misfire_grace_time=1200, max_instances=1)
except Exception:
ones_that_tried_again.append(tag)
pass
if changes:
try:
await db_client.clan_wars.bulk_write(changes, ordered=False)
Expand All @@ -175,7 +187,6 @@ async def fetch(url, session: aiohttp.ClientSession, headers, tag, throttler: Th
store_fails = []

async def store_war(clan_tag: str, opponent_tag: str, prep_time: int):
await asyncio.sleep(0)
global in_war
global store_fails

Expand All @@ -186,65 +197,63 @@ async def store_war(clan_tag: str, opponent_tag: str, prep_time: int):
if opponent_tag in in_war:
in_war.remove(opponent_tag)

async def get_war(clan_tag : str):
try:
war = await coc_client.get_clan_war(clan_tag=clan_tag)
return war
except (coc.NotFound, coc.errors.Forbidden, coc.errors.PrivateWarLog):
return "no access"
except coc.errors.Maintenance:
return "maintenance"
except Exception as e:
logger.error(str(e))
return "error"

switched = False
war = None
war_found = False
time_tried = 0
while not war_found:
war = await get_war(clan_tag=clan_tag)
if isinstance(war, coc.ClanWar):
#if no war or the wars dont match
#try checking opponents side
#if we already have, break, is lost cause
if war.preparation_start_time is None or int(war.preparation_start_time.time.replace(tzinfo=pend.UTC).timestamp()) != prep_time:
async def find_active_war(clan_tag: str, opponent_tag: str, prep_time: int):
async def get_war(clan_tag: str):
try:
war = await coc_client.get_clan_war(clan_tag=clan_tag)
return war
except (coc.NotFound, coc.errors.Forbidden, coc.errors.PrivateWarLog):
return "no access"
except coc.errors.Maintenance:
return "maintenance"
except Exception as e:
logger.error(str(e))
return "error"

switched = False
tries = 0
while True:
war = await get_war(clan_tag=clan_tag)

if isinstance(war, coc.ClanWar):
if war.state == "warEnded":
return war # Found the completed war
# Check prep time and retry if needed
prep_start_timestamp = int(war.preparation_start_time.time.replace(tzinfo=pend.UTC).timestamp())
if war.preparation_start_time is None or prep_start_timestamp != prep_time:
if not switched:
clan_tag = opponent_tag
switched = True
continue # Try with the opponent's tag
else:
return None # Both tags checked, no valid war found
elif war == "maintenance":
await asyncio.sleep(15 * 60) # Wait 15 minutes for maintenance, then continue loop
continue
elif war == "error":
break # Stop on error
elif war == "no access":
if not switched:
clan_tag = opponent_tag
switched = True
continue
continue # Access issue, switch clan tag
else:
break
elif war.state == "warEnded":
war_found = True
break
elif war == "maintenance":
await asyncio.sleep(300)
break
elif war == "no access":
if not switched:
clan_tag = opponent_tag
switched = True
continue
else:
return None # Both tags checked, no access to either

await asyncio.sleep(war._response_retry) # Wait before retry based on response retry attribute
tries += 1
if tries == 10:
break
elif war == "error":
break

await asyncio.sleep(war._response_retry)
time_tried += 1
if time_tried == 10:
break
return None

war = await find_active_war(clan_tag=clan_tag, opponent_tag=opponent_tag, prep_time=prep_time)

if not war_found:
if war is None:
store_fails.append(war)
return

war_unique_id = "-".join(sorted([war.clan.tag, war.opponent.tag])) + f"-{int(war.preparation_start_time.time.replace(tzinfo=pend.UTC).timestamp())}"

war_result = await db_client.clan_wars.find_one({"war_id" : war_unique_id})
if war_result.get("data") is not None:
return

custom_id = hashids.encode(int(war.preparation_start_time.time.replace(tzinfo=pend.UTC).timestamp()) + int(pend.now(tz=pend.UTC).timestamp()) + random.randint(1000000000, 9999999999))
await db_client.clan_wars.update_one({"war_id": war_unique_id},
Expand All @@ -255,7 +264,9 @@ async def get_war(clan_tag : str):


async def main():
await broadcast()
scheduler = AsyncIOScheduler(timezone=pend.UTC)
scheduler.start()
await broadcast(scheduler=scheduler)



0 comments on commit 65364e0

Please sign in to comment.