Skip to content

Commit

Permalink
Migrate stats system to sqlite
Browse files Browse the repository at this point in the history
  • Loading branch information
hanzi committed Sep 9, 2024
1 parent a2e9cdc commit 170a045
Show file tree
Hide file tree
Showing 14 changed files with 1,094 additions and 596 deletions.
3 changes: 3 additions & 0 deletions modules/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from modules.libmgba import LibmgbaEmulator
from modules.profiles import Profile
from modules.roms import ROM
from modules.stats import StatsDatabase


from modules.config import Config

Expand Down Expand Up @@ -41,6 +43,7 @@ def __init__(self, initial_bot_mode: str = "Manual"):
self.emulator: Optional["LibmgbaEmulator"] = None
self.gui: Optional["PokebotGui"] = None
self.profile: Optional["Profile"] = None
self.stats: Optional["StatsDatabase"] = None
self.debug: bool = False

self._current_message: str = ""
Expand Down
7 changes: 3 additions & 4 deletions modules/discord.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def discord_message(


def discord_rich_presence() -> None:
from modules.stats import total_stats
from asyncio import new_event_loop as new_loop, set_event_loop as set_loop

set_loop(new_loop())
Expand All @@ -89,16 +88,16 @@ def discord_rich_presence() -> None:
large_image = None

while True:
encounter_log = total_stats.get_encounter_log()
totals = total_stats.get_total_stats()
encounter_log = context.stats.get_encounter_log()
totals = context.stats.get_total_stats()
location = encounter_log[-1]["pokemon"]["metLocation"] if len(encounter_log) > 0 else "N/A"
current_fps = context.emulator.get_current_fps()

rpc.update(
state=f"{location} | {context.rom.game_name}",
details=(
f"{totals.get('totals', {}).get('encounters', 0):,} ({totals.get('totals', {}).get('shiny_encounters', 0):,}✨) | "
f"{total_stats.get_encounter_rate():,}/h | "
f"{context.stats.encounter_rate:,}/h | "
f"{current_fps:,}fps ({current_fps / 59.727500569606:0.2f}x)"
),
large_image=large_image,
Expand Down
24 changes: 17 additions & 7 deletions modules/encounter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import importlib
from enum import Enum, auto
from pathlib import Path
from typing import Callable

from modules.console import console
from modules.context import context
Expand Down Expand Up @@ -30,10 +32,21 @@ def shiny_encounter_gif() -> Path | None:
return None


_custom_catch_filters: Callable[[Pokemon], str | bool] | None = None


def run_custom_catch_filters(pokemon: Pokemon) -> str | bool:
from modules.stats import total_stats
global _custom_catch_filters
if _custom_catch_filters is None:
if (context.profile.path / "customcatchfilters.py").is_file():
module = importlib.import_module(".customcatchfilters", f"profiles.{context.profile.path.name}")
_custom_catch_filters = module.custom_catch_filters
else:
from profiles.customcatchfilters import custom_catch_filters

result = total_stats.custom_catch_filters(pokemon)
_custom_catch_filters = custom_catch_filters

result = _custom_catch_filters(pokemon)
if result is True:
result = "Matched a custom catch filter"
return result
Expand Down Expand Up @@ -91,11 +104,8 @@ def judge_encounter(pokemon: Pokemon) -> EncounterValue:
def log_encounter(
pokemon: Pokemon, action: BattleAction | None = None, gif_path: Path | None = None, tcg_path: Path | None = None
) -> None:
from modules.stats import total_stats

total_stats.log_encounter(
pokemon, context.config.catch_block.block_list, run_custom_catch_filters(pokemon), gif_path, tcg_path
)
ccf_result = run_custom_catch_filters(pokemon)
context.stats.log_encounter(pokemon, ccf_result)
if context.config.logging.save_pk3.all:
save_pk3(pokemon)

Expand Down
11 changes: 11 additions & 0 deletions modules/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,14 @@ def decode_string(
# Actual printable characters
string += character_table[i]
return string


def encode_string(string: str) -> bytes:
result = b""
for index in range(len(string)):
character = string[index]
if character not in _character_table_international:
raise ValueError(f"Cannot encode '{character}'.")
code = _character_table_international.index(character)
result += int.to_bytes(code)
return result
4 changes: 4 additions & 0 deletions modules/game_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ def get_game_stat(game_stat: GameStat) -> int:
game_stats_offset += game_stat.value * 4
encryption_key = unpack_uint32(get_save_block(2, encryption_key_offset, size=4))
return unpack_uint32(get_save_block(1, game_stats_offset, size=4)) ^ encryption_key


def get_total_number_of_battles() -> int:
return get_game_stat(GameStat.TOTAL_BATTLES)
3 changes: 1 addition & 2 deletions modules/gui/debug_tabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,6 @@ def update(self, emulator: "LibmgbaEmulator"):

def _get_data(self):
from modules.libmgba import input_map
from modules.stats import total_stats

current_inputs = context.emulator.get_inputs()
inputs_dict = {"__value": []}
Expand Down Expand Up @@ -1069,7 +1068,7 @@ def _get_data(self):
"Session Frame": f"{context.frame:,}",
"Session Time at 1×": f"{session_time_at_1x}",
"RNG Seed": hex(unpack_uint32(read_symbol("gRngValue"))),
"Encounters/h (at 1×)": total_stats.get_encounter_rate_at_1x(),
"Encounters/h (at 1×)": context.stats.encounter_rate_at_1x,
"Currently Running Actions": debug.action_stack,
"Debug Values": debug.debug_values,
}
Expand Down
4 changes: 1 addition & 3 deletions modules/gui/emulator_controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,7 @@ def _update_stats(self):
if current_fps:
stats.append(f"{current_fps:,}fps ({current_fps / 59.727500569606:0.2f}x)")
if context.profile:
from modules.stats import total_stats

stats.append(f"{total_stats.get_encounter_rate():,}/h")
stats.append(f"{context.stats.encounter_rate:,}/h")
if context.debug:
stats.append(f"{round(current_load * 100, 1)}%")
self.stats_label.config(text=" | ".join(stats))
Expand Down
3 changes: 3 additions & 0 deletions modules/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from modules.memory import GameState, get_game_state
from modules.modes import BotListener, BotMode, BotModeError, FrameInfo, get_bot_listeners, get_bot_mode_by_name
from modules.plugins import plugin_profile_loaded
from modules.stats import StatsDatabase
from modules.tasks import get_global_script_context, get_tasks

# Contains a queue of tasks that should be run the next time a frame completes.
Expand All @@ -24,6 +25,8 @@ def main_loop() -> None:
plugin_profile_loaded(context.profile)
current_mode: BotMode | None = None

context.stats = StatsDatabase(context.profile)

if context.config.discord.rich_presence:
from modules.discord import discord_rich_presence

Expand Down
11 changes: 4 additions & 7 deletions modules/menuing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Generator

from modules.context import context
from modules.game_stats import get_game_stat, GameStat
from modules.items import Item, get_item_bag
from modules.memory import GameState, get_event_flag, get_game_state, read_symbol, unpack_uint16
from modules.menu_parsers import (
Expand Down Expand Up @@ -595,14 +596,12 @@ def get_pokemon_with_pickup_and_item(self):
self.picked_up_items.append(mon.held_item)

def check_pickup_threshold(self):
from modules.stats import total_stats

if context.config.cheats.faster_pickup:
self.check_threshold_met = True
self.checked = True
else:
self.check_threshold_met = (
total_stats.get_session_encounters() % context.config.battle.pickup_check_frequency == 0
get_game_stat(GameStat.TOTAL_BATTLES) % context.config.battle.pickup_check_frequency == 0
)
self.get_pokemon_with_pickup_and_item()
if context.config.battle.pickup_threshold > self.pokemon_with_pickup > 0:
Expand All @@ -615,7 +614,7 @@ def check_pickup_threshold(self):
threshold = context.config.battle.pickup_threshold
self.pickup_threshold_met = self.check_threshold_met and len(self.pokemon_with_pickup_and_item) >= threshold
if self.pickup_threshold_met:
total_stats.update_pickup_items(self.picked_up_items)
context.stats.log_pickup_items(self.picked_up_items)
context.message = "Pickup threshold is met! Gathering items..."

def open_party_menu(self):
Expand Down Expand Up @@ -761,11 +760,9 @@ def step(self):


def should_check_for_pickup():
from modules.stats import total_stats

if (
context.config.cheats.faster_pickup
or total_stats.get_session_encounters() % context.config.battle.pickup_check_frequency == 0
or get_game_stat(GameStat.TOTAL_BATTLES) % context.config.battle.pickup_check_frequency == 0
):
return True
return False
Expand Down
5 changes: 3 additions & 2 deletions modules/modes/feebas.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ._interface import BotMode, BotModeError
from .util import ensure_facing_direction, fish, navigate_to, register_key_item
from ..console import console
from ..game_stats import get_total_number_of_battles, get_game_stat, GameStat
from ..map import get_map_all_tiles

# Bad tiles such as cliffs marked as surfable, but can't surf or fish on it
Expand Down Expand Up @@ -115,8 +116,8 @@ def run(self) -> Generator:
self.feebas_found = False

if self.tile_checked < self.tile_threshold or self.feebas_found:
total_encounters = total_stats.get_total_encounters()
while total_encounters == total_stats.get_total_encounters():
total_encounters = get_game_stat(GameStat.FISHING_ENCOUNTERS)
while total_encounters == get_game_stat(GameStat.FISHING_ENCOUNTERS):
yield from fish()
if get_opponent().species.name == "Feebas":
self.checked_tiles = bad_tiles
Expand Down
131 changes: 0 additions & 131 deletions modules/pokemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -1189,137 +1189,6 @@ def __str__(self):
def to_dict(self) -> dict:
return _to_dict_helper(self)

def to_legacy_dict(self) -> dict:
"""
Converts the Pokémon data into a simple dict, which can then be used for JSON-encoding
the data.
This is not meant to be used in new code unless truly necessary. It is mainly here, so
we can continue to provide external consumers with the same data structures as before
(it is currently being used for the WebServer and for `shiny_log.json`.)
:return: A legacy-format dictionary containing data about this Pokémon
"""
moves = []
for i in range(4):
learned_move = self.move(i)
if learned_move is None:
moves.append(
{
"accuracy": 0,
"effect": "None",
"id": 0,
"kind": "None",
"name": "None",
"power": 0,
"pp": 0,
"remaining_pp": 0,
"type": "None",
}
)
continue

move = learned_move.move
moves.append(
{
"accuracy": move.accuracy,
"effect": move.effect,
"id": move.index,
"kind": move.type.kind,
"name": move.name,
"power": move.base_power,
"pp": learned_move.total_pp,
"remaining_pp": learned_move.pp,
"type": move.type.name,
},
)

return {
"EVs": {
"attack": self.evs.attack,
"defence": self.evs.defence,
"hp": self.evs.hp,
"spAttack": self.evs.special_attack,
"spDefense": self.evs.special_defence,
"speed": self.evs.speed,
},
"IVSum": self.ivs.sum(),
"IVs": {
"attack": self.ivs.attack,
"defense": self.ivs.defence,
"hp": self.ivs.hp,
"spAttack": self.ivs.special_attack,
"spDefense": self.ivs.special_defence,
"speed": self.ivs.speed,
},
"ability": self.ability.name,
"antiShiny": self.is_anti_shiny,
"calculatedChecksum": self.calculate_checksum(),
"checksum": self.get_data_checksum(),
"condition": {
"beauty": self.contest_conditions.beauty,
"cool": self.contest_conditions.coolness,
"cute": self.contest_conditions.cuteness,
"feel": self.contest_conditions.feel,
"smart": self.contest_conditions.smartness,
"tough": self.contest_conditions.toughness,
},
"expGroup": self.species.level_up_type.value,
"experience": self.species.base_experience_yield,
"friendship": self.friendship,
"hasSpecies": 1,
"hiddenPower": self.hidden_power_type.name,
"id": self.species.index,
"isEgg": 1 if self.is_egg else 0,
"item": {
"id": self.held_item.index if self.held_item else 0,
"name": self.held_item.name if self.held_item else "None",
},
"language": self.language.value,
"level": self.level,
"markings": {
"circle": Marking.Circle in self.markings,
"heart": Marking.Heart in self.markings,
"square": Marking.Square in self.markings,
"triangle": Marking.Triangle in self.markings,
},
"metLocation": self.location_met,
"moves": moves,
"name": self.species.name,
"natID": self.species.national_dex_number,
"nature": self.nature.name,
"origins": {
"ball": self.poke_ball.name,
"game": self.game_of_origin,
"hatched": self.level_met == 0,
"metLevel": self.level_met,
},
"ot": {"sid": self.original_trainer.secret_id, "tid": self.original_trainer.id},
"pid": self.personality_value,
"pokerus": {"days": self.pokerus_status.days_remaining, "strain": self.pokerus_status.strain},
"shiny": self.is_shiny,
"shinyValue": self.shiny_value,
"species": self.species.index,
"stats": {
"attack": self.stats.attack,
"defense": self.stats.defence,
"hp": self.current_hp,
"maxHP": self.total_hp,
"spAttack": self.stats.special_attack,
"spDefense": self.stats.special_defence,
"speed": self.stats.speed,
},
"status": {
"badPoison": self.status_condition == StatusCondition.BadPoison,
"burn": self.status_condition == StatusCondition.Burn,
"freeze": self.status_condition == StatusCondition.Freeze,
"paralysis": self.status_condition == StatusCondition.Paralysis,
"poison": self.status_condition == StatusCondition.Poison,
"sleep": self.sleep_duration if self.status_condition == StatusCondition.Sleep else 0,
},
"type": [self.species.types[0].name, self.species.types[1].name if len(self.species.types) > 1 else ""],
}


def parse_pokemon(data: bytes) -> Pokemon | None:
pokemon = Pokemon(data)
Expand Down
Loading

0 comments on commit 170a045

Please sign in to comment.