diff --git a/.github/workflows/pre-release.yml b/.github/workflows/pre-release.yml index 7f75c51..7a3f746 100644 --- a/.github/workflows/pre-release.yml +++ b/.github/workflows/pre-release.yml @@ -32,8 +32,8 @@ jobs: - name: "Create archives" run: | - zip -r seekers-stubs.zip seekers/grpc/stubs - zip -r seekers.zip * + zip -r seekers-linux-stubs.zip seekers/grpc/stubs + zip -r seekers-linux.zip * - name: "Build binaries" run: | @@ -73,8 +73,8 @@ jobs: - name: "Create archives" run: | - powershell Compress-Archive ".\" "seekers.zip" - powershell Compress-Archive ".\seekers\grpc\stubs" "seekers-stubs.zip" + powershell Compress-Archive ".\" "seekers-win32.zip" + powershell Compress-Archive ".\seekers\grpc\stubs" "seekers-win32-stubs.zip" - name: "Build binaries" run: | diff --git a/freeze.bat b/freeze.bat index 29f53f3..13127a6 100644 --- a/freeze.bat +++ b/freeze.bat @@ -9,5 +9,5 @@ echo Building binaries ... .\venv\Scripts\python setup.py build echo Compress artifacts ... -for /d %%a in (build\*) do (powershell Compress-Archive ".\%%a\*" "seekers-bin.zip") +for /d %%a in (build\*) do (powershell Compress-Archive ".\%%a\*" "seekers-win32-bin.zip") echo Finished! diff --git a/freeze.sh b/freeze.sh index 40be3b9..fe35466 100755 --- a/freeze.sh +++ b/freeze.sh @@ -10,4 +10,4 @@ echo "Building binaries ..." venv/bin/python setup.py build echo "Create archive" -zip -r seekers-bin.zip build +zip -r seekers-linux-bin.zip build diff --git a/requirements.txt b/requirements.txt index b8e4665..7309962 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -pygame>=2.1.0 +pygame~=2.6.0 grpcio==1.64.1 protobuf==5.27.2 diff --git a/run_client.py b/run_client.py index bff19a9..91af650 100644 --- a/run_client.py +++ b/run_client.py @@ -3,7 +3,8 @@ import sys import logging import seekers.grpc.client -import seekers.seekers_types + +from seekers.game.player import LocalPlayerAi def run_ai(args: argparse.Namespace): @@ -14,7 +15,7 @@ def run_ai(args: argparse.Namespace): stream=sys.stdout, force=True ) - ai = seekers.seekers_types.LocalPlayerAi.from_file(args.ai_file) + ai = LocalPlayerAi.from_file(args.ai_file) service_wrapper = seekers.grpc.client.GrpcSeekersServiceWrapper(address=args.address) client = seekers.grpc.client.GrpcSeekersClient(service_wrapper, ai, careful_mode=args.careful) @@ -33,9 +34,9 @@ def run_ai(args: argparse.Namespace): def main(): parser = argparse.ArgumentParser(description='Run a Python Seekers AI as a gRPC client.') - parser.add_argument("-address", "-a", type=str, default="localhost:7777", + parser.add_argument("--address", "-a", type=str, default="localhost:7777", help="Address of the Seekers game. (default: localhost:7777)") - parser.add_argument("-loglevel", "-log", "-l", type=str, default="INFO", + parser.add_argument("--loglevel", "--log", "-l", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]) parser.add_argument("--careful", action="store_true", help="Enable careful mode for the gRPC clients. This will " "raise an exception and stop the client when errors " diff --git a/run_seekers.py b/run_seekers.py index 49b8b2d..05daa0a 100644 --- a/run_seekers.py +++ b/run_seekers.py @@ -5,7 +5,6 @@ import sys from seekers import * -from seekers.game import SeekersGame def parse_config_overrides(overrides: list[str]) -> dict[str, str]: @@ -25,22 +24,22 @@ def parse_config_overrides(overrides: list[str]) -> dict[str, str]: def main(): parser = argparse.ArgumentParser(description="Run python seekers AIs.") - parser.add_argument("--nogrpc", action="store_true", help="Don't host a gRPC server.") - parser.add_argument("--nokill", action="store_true", help="Don't kill the process after the game is over.") + parser.add_argument("--no-grpc", action="store_true", help="Don't host a gRPC server.") + parser.add_argument("--no-kill", action="store_true", help="Don't kill the process after the game is over.") parser.add_argument("--debug", action="store_true", help="Enable debug mode. This will enable debug drawing.") - parser.add_argument("-address", "-a", type=str, default="localhost:7777", + parser.add_argument("--address", "-a", type=str, default="localhost:7777", help="Address of the server. (default: localhost:7777)") - parser.add_argument("-config", "-c", type=str, default="config.ini", + parser.add_argument("--config", "-c", type=str, default="config.ini", help="Path to the config file. (default: config.ini)") - parser.add_argument("-config-override", "-co", action="append", + parser.add_argument("--config-override", "--override", "-o", action="append", help="Override a config option. Use the form option=value, e.g. global.seed=43.") - parser.add_argument("-loglevel", "-log", "-l", type=str, default="INFO", + parser.add_argument("--loglevel", "--log", "-l", type=str, default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]) parser.add_argument("ai_files", type=str, nargs="*", help="Paths to the AIs.") args = parser.parse_args() - if args.nogrpc and not args.ai_files: + if args.no_grpc and not args.ai_files: raise ValueError("At least one AI file must be provided if gRPC is disabled.") config = Config.from_filepath(args.config) @@ -54,14 +53,14 @@ def main(): logging.basicConfig(level=args.loglevel, style="{", format=f"[{{name}}] {{levelname}}: {{message}}", stream=sys.stdout) - address = args.address if not args.nogrpc else False + address = args.address if not args.no_grpc else False seekers_game = SeekersGame( local_ai_locations=args.ai_files, config=config, grpc_address=address, debug=args.debug, - dont_kill=args.nokill + dont_kill=args.no_kill ) seekers_game.listen() seekers_game.start() diff --git a/seekers/__init__.py b/seekers/__init__.py index ea379fa..0c7b9e1 100644 --- a/seekers/__init__.py +++ b/seekers/__init__.py @@ -1,14 +1,12 @@ -from __future__ import annotations - +from .vector import * +from .config import * from .colors import Color -from .seekers_types import ( - Config, - Vector, - Physical, - Goal, - Magnet, - Seeker, - Player, - World, - Camp, -) + +from .player import * +from .goal import * +from .seeker import * +from .physical import * +from .camp import * +from .world import * + +from . import debug_drawing diff --git a/seekers/camp.py b/seekers/camp.py new file mode 100644 index 0000000..9ecea2a --- /dev/null +++ b/seekers/camp.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import dataclasses + +from .vector import * +from . import player + + +__all__ = [ + "Camp" +] + + +@dataclasses.dataclass +class Camp: + id: str + owner: player.Player + position: Vector + width: float + height: float + + def contains(self, pos: Vector) -> bool: + delta = self.position - pos + return 2 * abs(delta.x) < self.width and 2 * abs(delta.y) < self.height + + @property + def top_left(self) -> Vector: + return self.position - Vector(self.width, self.height) / 2 + + @property + def bottom_right(self) -> Vector: + return self.position + Vector(self.width, self.height) / 2 diff --git a/seekers/colors.py b/seekers/colors.py index 3b189e5..8247213 100644 --- a/seekers/colors.py +++ b/seekers/colors.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import random import typing import colorsys diff --git a/seekers/config.py b/seekers/config.py new file mode 100644 index 0000000..392fadc --- /dev/null +++ b/seekers/config.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +import configparser +import dataclasses +import typing + +__all__ = [ + "Config", +] + + +@dataclasses.dataclass +class Config: + """Configuration for the Seekers game.""" + global_wait_for_players: bool + global_playtime: int + global_seed: int + global_fps: int + global_speed: int + global_players: int + global_seekers: int + global_goals: int + global_color_threshold: float + + map_width: int + map_height: int + + camp_width: int + camp_height: int + + seeker_thrust: float + seeker_magnet_slowdown: float + seeker_disabled_time: int + seeker_radius: float + seeker_mass: float + seeker_friction: float + + goal_scoring_time: int + goal_radius: float + goal_mass: float + goal_thrust: float + goal_friction: float + + @property + def map_dimensions(self): + return self.map_width, self.map_height + + @classmethod + def from_file(cls, file) -> "Config": + cp = configparser.ConfigParser() + cp.read_file(file) + + return cls( + global_wait_for_players=cp.getboolean("global", "wait-for-players"), + global_playtime=cp.getint("global", "playtime"), + global_seed=cp.getint("global", "seed"), + global_fps=cp.getint("global", "fps"), + global_speed=cp.getint("global", "speed"), + global_players=cp.getint("global", "players"), + global_seekers=cp.getint("global", "seekers"), + global_goals=cp.getint("global", "goals"), + global_color_threshold=cp.getfloat("global", "color-threshold"), + + map_width=cp.getint("map", "width"), + map_height=cp.getint("map", "height"), + + camp_width=cp.getint("camp", "width"), + camp_height=cp.getint("camp", "height"), + + seeker_thrust=cp.getfloat("seeker", "thrust"), + seeker_magnet_slowdown=cp.getfloat("seeker", "magnet-slowdown"), + seeker_disabled_time=cp.getint("seeker", "disabled-time"), + seeker_radius=cp.getfloat("seeker", "radius"), + seeker_mass=cp.getfloat("seeker", "mass"), + seeker_friction=cp.getfloat("seeker", "friction"), + + goal_scoring_time=cp.getint("goal", "scoring-time"), + goal_radius=cp.getfloat("goal", "radius"), + goal_mass=cp.getfloat("goal", "mass"), + goal_thrust=cp.getfloat("goal", "thrust"), + goal_friction=cp.getfloat("goal", "friction"), + ) + + @classmethod + def from_filepath(cls, filepath: str) -> "Config": + with open(filepath) as f: + return cls.from_file(f) + + @staticmethod + def value_to_str(value: bool | float | int | str) -> str: + if isinstance(value, bool): + return str(value).lower() + elif isinstance(value, float): + return f"{value:.2f}" + else: + return str(value) + + @staticmethod + def value_from_str(value: str, type_: typing.Literal["bool", "float", "int", "str"]) -> bool | float | int | str: + if type_ == "bool": + return value.lower() == "true" + elif type_ == "float": + return float(value) + elif type_ == "int": + return int(float(value)) + else: + return value + + @staticmethod + def get_section_and_key(attribute_name: str) -> tuple[str, str]: + """Split an attribute name into the config header name and the key name.""" + + section, key = attribute_name.split("_", 1) + + return section, key.replace("_", "-") + + @staticmethod + def get_attribute_name(section: str, key: str) -> str: + return f"{section}_{key.replace('-', '_')}" + + @classmethod + def get_field_type(cls, field_name: str) -> typing.Literal["bool", "float", "int", "str"]: + field_types = {f.name: f.type for f in dataclasses.fields(cls)} + return field_types[field_name] + + def import_option(self, section: str, key: str, value: str): + field_name = self.get_attribute_name(section, key) + field_type = self.get_field_type(field_name) + + setattr(self, field_name, self.value_from_str(value, field_type)) diff --git a/seekers/debug_drawing.py b/seekers/debug_drawing.py index 6aaa5b2..07c3d1f 100644 --- a/seekers/debug_drawing.py +++ b/seekers/debug_drawing.py @@ -1,16 +1,24 @@ +from __future__ import annotations + import abc import dataclasses import typing from contextvars import ContextVar -from .seekers_types import Vector -from .draw import GameRenderer +from .vector import * +from . import draw + +__all__ = [ + "draw_text", + "draw_line", + "draw_circle", +] @dataclasses.dataclass class DebugDrawing(abc.ABC): @abc.abstractmethod - def draw(self, game_renderer: GameRenderer): + def draw(self, game_renderer: draw.GameRenderer): ... @@ -21,7 +29,7 @@ class TextDebugDrawing(DebugDrawing): color: tuple[int, int, int] = (255, 255, 255) center: bool = True - def draw(self, game_renderer: GameRenderer): + def draw(self, game_renderer: draw.GameRenderer): # draw the text centered at the position game_renderer.draw_text(self.text, self.color, self.position, center=self.center) @@ -33,7 +41,7 @@ class LineDebugDrawing(DebugDrawing): color: tuple[int, int, int] = (255, 255, 255) width: int = 2 - def draw(self, game_renderer: GameRenderer): + def draw(self, game_renderer: draw.GameRenderer): game_renderer.draw_line(self.color, self.start, self.end, self.width) @@ -44,7 +52,7 @@ class CircleDebugDrawing(DebugDrawing): color: tuple[int, int, int] = (255, 255, 255) width: int = 2 - def draw(self, game_renderer: GameRenderer): + def draw(self, game_renderer: draw.GameRenderer): game_renderer.draw_circle(self.color, self.position, self.radius, self.width) @@ -60,5 +68,6 @@ def draw_circle(position: Vector, radius: float, color: tuple[int, int, int] = ( add_debug_drawing_func_ctxtvar.get()(CircleDebugDrawing(position, radius, color, width)) -add_debug_drawing_func_ctxtvar: \ - ContextVar[typing.Callable[[DebugDrawing], None]] = ContextVar("add_debug_drawing_func", default=lambda _: None) +add_debug_drawing_func_ctxtvar: ContextVar[typing.Callable[[DebugDrawing], None]] = ( + ContextVar("add_debug_drawing_func", default=lambda _: None) +) diff --git a/seekers/draw.py b/seekers/draw.py index 97efa10..a5c8f36 100644 --- a/seekers/draw.py +++ b/seekers/draw.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import abc import typing import math @@ -5,7 +7,19 @@ import pygame from .colors import * -from .seekers_types import * +from .vector import * +from .config import * +from . import ( + world, + seeker, + player, + camp, + goal, +) + +__all__ = [ + "GameRenderer", +] class Animation(abc.ABC): @@ -47,18 +61,18 @@ def __init__(self, config: Config, debug_mode: bool = False): self.config = config self.debug_mode = debug_mode - self.world = World(self.config.map_width, self.config.map_height) + self.world = world.World(self.config.map_width, self.config.map_height) - def init(self, players: typing.Iterable[Player], goals: list[Goal]): + def init(self, players: typing.Iterable[player.Player]): pygame.init() for p in players: name = p.name if self.debug_mode: - if isinstance(p, GrpcClientPlayer): + if isinstance(p, player.GrpcClientPlayer): name += f" (gRPC)" - elif isinstance(p, LocalPlayer): + elif isinstance(p, player.LocalPlayer): name += f" (local)" self.player_name_images[p.id] = self.font.render(name, True, p.color) @@ -108,37 +122,38 @@ def draw_rect(self, color: Color, p1: Vector, p2: Vector, width: int = 0): p1, p2 ) - def draw(self, players: typing.Collection[Player], camps: typing.Iterable[Camp], goals: typing.Iterable[Goal], + def draw(self, players: typing.Collection[player.Player], camps: typing.Iterable[camp.Camp], + goals: typing.Iterable[goal.Goal], animations: list[Animation], clock: pygame.time.Clock): # clear screen self.screen.fill(self.background_color) # draw camps - for camp in camps: - self.draw_rect(camp.owner.color, camp.top_left, camp.bottom_right, 5) + for camp_ in camps: + self.draw_rect(camp_.owner.color, camp_.top_left, camp_.bottom_right, 5) # draw goals - for goal in goals: + for goal_ in goals: color = ( - interpolate_color((255, 255, 255), goal.owner.color, - min(1.0, (goal.time_owned / goal.scoring_time) ** 2)) - if goal.owner else (255, 255, 255) + interpolate_color((255, 255, 255), goal_.owner.color, + min(1.0, (goal_.time_owned / goal_.scoring_time) ** 2)) + if goal_.owner else (255, 255, 255) ) - self.draw_circle(color, goal.position, goal.radius) + self.draw_circle(color, goal_.position, goal_.radius) # draw jet streams - for player in players: - for seeker in player.seekers.values(): - a = seeker.acceleration - if not seeker.is_disabled and a.squared_length() > 0: - self.draw_jet_stream(seeker, -a) + for player_ in players: + for seeker_ in player_.seekers.values(): + a = seeker_.acceleration + if not seeker_.is_disabled and a.squared_length() > 0: + self.draw_jet_stream(seeker_, -a) # draw seekers - for player in players: - for i, seeker in enumerate(player.seekers.values()): - self.draw_seeker(seeker, player, str(i)) + for player_ in players: + for i, seeker_ in enumerate(player_.seekers.values()): + self.draw_seeker(seeker_, player_, str(i)) - for debug_drawing in player.debug_drawings: + for debug_drawing in player_.debug_drawings: debug_drawing.draw(self) # draw animations @@ -151,39 +166,39 @@ def draw(self, players: typing.Collection[Player], camps: typing.Iterable[Camp], # update display pygame.display.flip() - def draw_seeker(self, seeker: Seeker, player: Player, debug_str: str): - color = player.color - if seeker.is_disabled: + def draw_seeker(self, seeker_: seeker.Seeker, player_: player.Player, debug_str: str): + color = player_.color + if seeker_.is_disabled: color = interpolate_color(color, [0, 0, 0], 0.5) - self.draw_circle(color, seeker.position, seeker.radius, width=0) - self.draw_halo(seeker, color) + self.draw_circle(color, seeker_.position, seeker_.radius, width=0) + self.draw_halo(seeker_, color) if self.debug_mode: - self.draw_text(debug_str, (0, 0, 0), seeker.position) + self.draw_text(debug_str, (0, 0, 0), seeker_.position) - def draw_halo(self, seeker: Seeker, color: Color): - adjpos = seeker.position - if seeker.is_disabled: + def draw_halo(self, seeker_: seeker.Seeker, color: Color): + adjpos = seeker_.position + if seeker_.is_disabled: return mu = abs(math.sin((int(pygame.time.get_ticks() / 30) % 50) / 50 * 2 * math.pi)) ** 2 - self.draw_circle(interpolate_color(color, [0, 0, 0], mu), adjpos, 3 + seeker.radius, 3) + self.draw_circle(interpolate_color(color, [0, 0, 0], mu), adjpos, 3 + seeker_.radius, 3) - if not seeker.magnet.is_on(): + if not seeker_.magnet.is_on(): return for offset in 0, 10, 20, 30, 40: - mu = int(-seeker.magnet.strength * pygame.time.get_ticks() / 50 + offset) % 50 - self.draw_circle(interpolate_color(color, [0, 0, 0], mu / 50), adjpos, mu + seeker.radius, 2) + mu = int(-seeker_.magnet.strength * pygame.time.get_ticks() / 50 + offset) % 50 + self.draw_circle(interpolate_color(color, [0, 0, 0], mu / 50), adjpos, mu + seeker_.radius, 2) - def draw_jet_stream(self, seeker: Seeker, direction: Vector): - length = seeker.radius * 3 - adjpos = seeker.position + def draw_jet_stream(self, seeker_: seeker.Seeker, direction: Vector): + length = seeker_.radius * 3 + adjpos = seeker_.position self.draw_line((255, 255, 255), adjpos, adjpos + direction * length) - def draw_information(self, players: typing.Collection[Player], pos: Vector, clock: pygame.time.Clock): + def draw_information(self, players: typing.Collection[player.Player], pos: Vector, clock: pygame.time.Clock): # draw fps fps = int(clock.get_fps()) self.draw_text(str(fps), (250, 250, 250), pos, center=False) diff --git a/seekers/game.py b/seekers/game.py index 6788bb8..84bf1e1 100644 --- a/seekers/game.py +++ b/seekers/game.py @@ -7,15 +7,29 @@ import random import time import typing + import pygame -from .seekers_types import * -from . import colors -from . import draw -from . import game_logic +from .config import * +from .ids import * +from . import ( + world, + draw, + seeker, + player, + goal, + game_logic, + colors +) + +__all__ = [ + "SeekersGame", + "GameFullError" +] -class GameFullError(Exception): ... +class GameFullError(Exception): + ... class SeekersGame: @@ -39,7 +53,7 @@ def __init__(self, local_ai_locations: typing.Iterable[str], config: Config, if grpc_address and len(self.players) < config.global_players: try: - from .grpc.server import GrpcSeekersServer + from seekers.grpc.server import GrpcSeekersServer self.grpc = GrpcSeekersServer(self, grpc_address) except ImportError as e: self._logger.warning("gRPC server could not be started. Import error.", exc_info=e) @@ -47,7 +61,7 @@ def __init__(self, local_ai_locations: typing.Iterable[str], config: Config, else: self.grpc = None - self.world = World(*self.config.map_dimensions) + self.world = world.World(*self.config.map_dimensions) self.goals = [] self.camps = [] @@ -57,6 +71,7 @@ def __init__(self, local_ai_locations: typing.Iterable[str], config: Config, ) self.animations = [] + self.clock = None self.ticks = 0 def start(self): @@ -68,13 +83,13 @@ def start(self): random.seed(self.config.global_seed) # initialize goals - self.goals = [Goal.from_config(get_id("Goal"), self.world.random_position(), self.config) for _ in + self.goals = [goal.Goal.from_config(get_id("Goal"), self.world.random_position(), self.config) for _ in range(self.config.global_goals)] # initialize players for p in self.players.values(): p.seekers = { - (id_ := get_id("Seeker")): Seeker.from_config(p, id_, self.world.random_position(), self.config) + (id_ := get_id("Seeker")): seeker.Seeker.from_config(p, id_, self.world.random_position(), self.config) for _ in range(self.config.global_seekers) } p.color = self.get_new_player_color(p) @@ -83,7 +98,7 @@ def start(self): self.camps = self.world.generate_camps(self.players.values(), self.config) # prepare graphics - self.renderer.init(self.players.values(), self.goals) + self.renderer.init(self.players.values()) try: if self.grpc: @@ -120,10 +135,10 @@ def mainloop(self): running = False break - for player in self.players.values(): + for player_ in self.players.values(): # self._logger.debug(f"Polling AI for player {player.name}") - player.poll_ai(self.config.global_wait_for_players, self.world, self.goals, self.players, - self.ticks, self.debug) + player_.poll_ai(self.config.global_wait_for_players, self.world, self.goals, self.players, + self.ticks, self.debug) game_logic.tick(self.players.values(), self.camps, self.goals, self.animations, self.world) @@ -169,24 +184,24 @@ def wait_for_players(): wait_for_players() @staticmethod - def load_local_players(ai_locations: typing.Iterable[str]) -> dict[str, Player]: + def load_local_players(ai_locations: typing.Iterable[str]) -> dict[str, player.Player]: """Return the players found in the given directories or files.""" - out: dict[str, Player] = {} + out: dict[str, player.Player] = {} for location in ai_locations: if os.path.isdir(location): for filename in glob.glob(os.path.join(location, "ai*.py")): - player = LocalPlayer.from_file(filename) - out |= {player.id: player} + player_ = player.LocalPlayer.from_file(filename) + out |= {player_.id: player_} elif os.path.isfile(location): - player = LocalPlayer.from_file(location) - out |= {player.id: player} + player_ = player.LocalPlayer.from_file(location) + out |= {player_.id: player_} else: raise Exception(f"Invalid AI location: {location!r} is neither a file nor a directory.") return out - def add_player(self, player: Player): + def add_player(self, player_: player.Player): """Add a player to the game while it is not running yet and raise a GameFullError if the game is full. This function is used by the gRPC server.""" @@ -198,21 +213,21 @@ def add_player(self, player: Player): f"Game full. Cannot add more players. Max player count is {self.config.global_players}." ) - self.players |= {player.id: player} + self.players |= {player_.id: player_} def print_scores(self): - for player in sorted(self.players.values(), key=lambda p: p.score, reverse=True): - print(f"{player.score} P.:\t{player.name}") + for player_ in sorted(self.players.values(), key=lambda p: p.score, reverse=True): + print(f"{player_.score} P.:\t{player_.name}") - def get_new_player_color(self, player: Player) -> colors.Color: + def get_new_player_color(self, player_: player.Player) -> colors.Color: old_colors = [p.color for p in self.players.values() if p.color is not None] preferred = ( - colors.string_hash_color(player.name) if player.preferred_color is None else player.preferred_color + colors.string_hash_color(player_.name) if player_.preferred_color is None else player_.preferred_color ) return colors.pick_new(old_colors, preferred, threshold=self.config.global_color_threshold) @property - def seekers(self) -> collections.ChainMap[str, Seeker]: + def seekers(self) -> collections.ChainMap[str, seeker.Seeker]: return collections.ChainMap(*(p.seekers for p in self.players.values())) diff --git a/seekers/game_logic.py b/seekers/game_logic.py index f5c4548..000171f 100644 --- a/seekers/game_logic.py +++ b/seekers/game_logic.py @@ -1,15 +1,34 @@ -from .draw import ScoreAnimation, Animation -from .seekers_types import * +from __future__ import annotations import typing - -def tick(players: typing.Iterable[Player], camps: list[Camp], goals: list[Goal], - animations: list[Animation], world: World): +from .vector import * +from . import ( + world, + draw, + camp, + goal, + player, + physical, + seeker +) + +__all__ = [ + "tick", +] + + +def tick( + players: typing.Iterable[player.Player], + camps: list[camp.Camp], + goals: list[goal.Goal], + animations: list[draw.Animation], + world_: world.World +): seekers = [s for p in players for s in p.seekers.values()] # move and recover seekers for s in seekers: - s.move(world) + s.move(world_) if s.is_disabled: s.disabled_counter -= 1 @@ -17,9 +36,9 @@ def tick(players: typing.Iterable[Player], camps: list[Camp], goals: list[Goal], for g in goals: g.acceleration = Vector(0, 0) for s in seekers: - g.acceleration += s.magnetic_force(world, g.position) + g.acceleration += s.magnetic_force(world_, g.position) - g.move(world) + g.move(world_) # handle collisions # noinspection PyTypeChecker @@ -29,23 +48,23 @@ def tick(players: typing.Iterable[Player], camps: list[Camp], goals: list[Goal], while j < len(physicals): phys2 = physicals[j] - d = world.torus_difference(phys2.position, phys1.position).squared_length() + d = world_.torus_difference(phys2.position, phys1.position).squared_length() min_dist = phys1.radius + phys2.radius if d < min_dist ** 2: - if isinstance(phys1, Seeker) and isinstance(phys2, Seeker): - Seeker.collision(phys1, phys2, world) + if isinstance(phys1, seeker.Seeker) and isinstance(phys2, seeker.Seeker): + seeker.Seeker.collision(phys1, phys2, world_) else: - Physical.collision(phys1, phys2, world) + physical.Physical.collision(phys1, phys2, world_) j += 1 # handle goals and scoring for i, g in enumerate(goals): - for camp in camps: - if g.camp_tick(camp): - goal_scored(camp.owner, i, goals, animations, world) + for camp_ in camps: + if g.camp_tick(camp_): + goal_scored(camp_.owner, i, goals, animations, world_) break # advance animations @@ -56,13 +75,19 @@ def tick(players: typing.Iterable[Player], camps: list[Camp], goals: list[Goal], animations.pop(i) -def goal_scored(player: Player, goal_index: int, goals: list[Goal], animations: list[Animation], world: World): - player.score += 1 +def goal_scored( + player_: player.Player, + goal_index: int, + goals: list[goal.Goal], + animations: list[draw.Animation], + world_: world.World +): + player_.score += 1 - goal = goals[goal_index] + goal_ = goals[goal_index] - animations.append(ScoreAnimation(goal.position, player.color, goal.radius)) + animations.append(draw.ScoreAnimation(goal_.position, player_.color, goal_.radius)) - goal.position = world.random_position() - goal.owner = None - goal.time_owned = 0 + goal_.position = world_.random_position() + goal_.owner = None + goal_.time_owned = 0 diff --git a/seekers/goal.py b/seekers/goal.py new file mode 100644 index 0000000..4791e0a --- /dev/null +++ b/seekers/goal.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from .vector import * +from .config import * +from . import ( + physical, + camp, + player, +) + +__all__ = [ + "Goal", +] + + +class Goal(physical.Physical): + def __init__(self, scoring_time: float, base_thrust: float, *args, **kwargs): + physical.Physical.__init__(self, *args, **kwargs) + + self.owner: player.Player | None = None + self.time_owned: int = 0 + + self.scoring_time = scoring_time + self.base_thrust = base_thrust + + def thrust(self) -> float: + return self.base_thrust + + @classmethod + def from_config(cls, id_: str, position: Vector, config: Config) -> "Goal": + return cls( + scoring_time=config.goal_scoring_time, + base_thrust=config.goal_thrust, + id_=id_, + position=position, + velocity=Vector(0, 0), + mass=config.goal_mass, + radius=config.goal_radius, + friction=config.goal_friction + ) + + def camp_tick(self, camp_: camp.Camp) -> bool: + """Update the goal and return True if it has been captured.""" + if camp_.contains(self.position): + if self.owner == camp_.owner: + self.time_owned += 1 + else: + self.time_owned = 0 + self.owner = camp_.owner + + return self.time_owned >= self.scoring_time diff --git a/seekers/grpc/converters.py b/seekers/grpc/converters.py index 595dc09..814bf00 100644 --- a/seekers/grpc/converters.py +++ b/seekers/grpc/converters.py @@ -10,7 +10,7 @@ from .stubs.org.seekers.grpc.game.vector2d_pb2 import Vector2D from .stubs.org.seekers.grpc.service.seekers_pb2 import Section -from .. import seekers_types as seekers +import seekers def vector_to_seekers(vector: Vector2D) -> seekers.Vector: diff --git a/seekers/grpc/server.py b/seekers/grpc/server.py index 8b34b9a..4a1bc12 100644 --- a/seekers/grpc/server.py +++ b/seekers/grpc/server.py @@ -9,7 +9,12 @@ from .stubs.org.seekers.grpc.service.seekers_pb2 import * from .stubs.org.seekers.grpc.service.seekers_pb2_grpc import * -from .. import game +from .. import ( + game, + colors, +) +from ..player import GrpcClientPlayer +from ..ids import * class GrpcSeekersServicer(SeekersServicer): @@ -57,7 +62,7 @@ def Command(self, request: CommandRequest, context: grpc.ServicerContext) -> Com # check if seeker is owned by player # noinspection PyTypeChecker - if not isinstance(seeker.owner, seekers.GrpcClientPlayer) or seeker.owner.token != request.token: + if not isinstance(seeker.owner, GrpcClientPlayer) or seeker.owner.token != request.token: context.abort( grpc.StatusCode.PERMISSION_DENIED, f"Seeker with id {command.seeker_id!r} (owner player id: {seeker.owner.id!r}) " @@ -87,7 +92,7 @@ def Command(self, request: CommandRequest, context: grpc.ServicerContext) -> Com self.generate_status() return self.current_status - def join_game(self, name: str, color: seekers.Color | None) -> tuple[str, str]: + def join_game(self, name: str, color: colors.Color | None) -> tuple[str, str]: # add the player with a new name if the requested name is already taken _requested_name = name i = 2 @@ -96,10 +101,10 @@ def join_game(self, name: str, color: seekers.Color | None) -> tuple[str, str]: i += 1 # create new player - new_token = seekers.get_id("Token") - player = seekers.GrpcClientPlayer( + new_token = get_id("Token") + player = GrpcClientPlayer( token=new_token, - id=seekers.get_id("Player"), + id=get_id("Player"), name=_requested_name, score=0, seekers={}, diff --git a/seekers/ids.py b/seekers/ids.py new file mode 100644 index 0000000..d0945c6 --- /dev/null +++ b/seekers/ids.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import collections +import random + +__all__ = [ + "get_id", +] + +_IDS = collections.defaultdict(list) + + +def get_id(obj: str): + rng = random.Random(obj) + + while (id_ := rng.randint(0, 2 ** 32)) in _IDS[obj]: + ... + + _IDS[obj].append(id_) + + return f"py-seekers.{obj}@{id_}" diff --git a/seekers/physical.py b/seekers/physical.py new file mode 100644 index 0000000..2adb6d9 --- /dev/null +++ b/seekers/physical.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from .vector import * +from . import world + +__all__ = [ + "Physical", +] + + +class Physical: + def __init__(self, id_: str, position: Vector, velocity: Vector, + mass: float, radius: float, friction: float): + self.id = id_ + + self.position = position + self.velocity = velocity + self.acceleration = Vector(0, 0) + + self.mass = mass + self.radius = radius + + self.friction = friction + + def update_acceleration(self, world_: world.World): + """Update self.acceleration. Ideally, that is a unit vector. This is supposed to be overridden by subclasses.""" + pass + + def thrust(self) -> float: + """Return the thrust, i.e. length of applied acceleration. This is supposed to be overridden by subclasses.""" + return 1 + + def move(self, world_: world.World): + # friction + self.velocity *= 1 - self.friction + + # acceleration + self.update_acceleration(world_) + self.velocity += self.acceleration * self.thrust() + + # displacement + self.position += self.velocity + + world_.normalize_position(self.position) + + def collision(self, other: Physical, world_: world.World): + # elastic collision + min_dist = self.radius + other.radius + + d = world_.torus_difference(self.position, other.position) + + dn = d.normalized() + dv = other.velocity - self.velocity + m = 2 / (self.mass + other.mass) + + dvdn = dv.dot(dn) + if dvdn < 0: + self.velocity += dn * (m * other.mass * dvdn) + other.velocity -= dn * (m * self.mass * dvdn) + + ddn = d.dot(dn) + if ddn < min_dist: + self.position += dn * (ddn - min_dist) + other.position -= dn * (ddn - min_dist) diff --git a/seekers/player.py b/seekers/player.py new file mode 100644 index 0000000..68a6158 --- /dev/null +++ b/seekers/player.py @@ -0,0 +1,351 @@ +from __future__ import annotations + +import abc +import copy +import dataclasses +import logging +import os +import textwrap +import threading +import typing + +from .vector import * +from .colors import * +from .ids import * +from . import ( + seeker, + camp, + world, + goal, +) + +__all__ = [ + "Player", + "LocalPlayerAi", + "LocalPlayer", + "GrpcClientPlayer", + "InvalidAiOutputError", + "DecideCallable", + "AiInput", +] + +AiInput = tuple[ + list["Seeker"], list["Seeker"], list["Seeker"], list["Goal"], list["Player"], "Camp", list["Camp"], "World", float +] +DecideCallable = typing.Callable[ + [ + list["Seeker"], # my seekers + list["Seeker"], # other seekers + list["Seeker"], # all seekers + list["Goal"], # goals + list["Player"], # other_players + "Camp", # my camp + list["Camp"], # camps + "World", # world + float # time + ], + list["Seeker"] # new my seekers +] + + +@dataclasses.dataclass +class Player: + id: str + name: str + score: int + seekers: dict[str, seeker.Seeker] + + color: Color | None = dataclasses.field(init=False, default=None) + camp: camp.Camp | None = dataclasses.field(init=False, default=None) + debug_drawings: list = dataclasses.field(init=False, default_factory=list) + preferred_color: Color | None = dataclasses.field(init=False, default=None) + + @abc.abstractmethod + def poll_ai(self, wait: bool, world_: world.World, goals: list[goal.Goal], + players: dict[str, "Player"], time_: float, debug: bool): + ... + + +class InvalidAiOutputError(Exception): + ... + + +@dataclasses.dataclass +class LocalPlayerAi: + filepath: str + timestamp: float + decide_function: DecideCallable + preferred_color: Color | None = None + + @staticmethod + def load_module(filepath: str) -> tuple[DecideCallable, Color | None]: + try: + with open(filepath) as f: + code = f.read() + + if code.strip().startswith("#bot"): + logging.info(f"AI {filepath!r} was loaded in compatibility mode. (#bot)") + # Wrap code inside a decide function (compatibility). + # The old function that did this was called 'mogrify'. + + func_header = ( + "def decide(seekers, other_seekers, all_seekers, goals, otherPlayers, own_camp, camps, world, " + "passed_time):" + ) + + fist_line, code = code.split("\n", 1) + + code = func_header + fist_line + ";\n" + textwrap.indent(code + "\nreturn seekers", " ") + + mod = compile("".join(code), filepath, "exec") + + mod_dict = {} + exec(mod, mod_dict) + + preferred_color = mod_dict.get("__color__", None) + if preferred_color is not None: + if not (isinstance(preferred_color, tuple) or isinstance(preferred_color, list)): + raise TypeError(f"__color__ must be a tuple or list, not {type(preferred_color)!r}.") + + if len(preferred_color) != 3: + raise ValueError(f"__color__ must be a tuple or list of length 3, not {len(preferred_color)}.") + + if "decide" not in mod_dict: + raise KeyError(f"AI {filepath!r} does not have a 'decide' function.") + + return mod_dict["decide"], preferred_color + except Exception as e: + # print(f"Error while loading AI {filepath!r}", file=sys.stderr) + # traceback.print_exc(file=sys.stderr) + # print(file=sys.stderr) + + raise InvalidAiOutputError(f"Error while loading AI {filepath!r}. Dummy AIs are not supported.") from e + + @classmethod + def from_file(cls, filepath: str) -> "LocalPlayerAi": + decide_func, preferred_color = cls.load_module(filepath) + + return cls(filepath, os.path.getctime(filepath), decide_func, preferred_color) + + def update(self): + new_timestamp = os.path.getctime(self.filepath) + if new_timestamp > self.timestamp: + logger = logging.getLogger("AiReloader") + logger.debug(f"Reloading AI {self.filepath!r}.") + + self.decide_function, self.preferred_color = self.load_module(self.filepath) + self.timestamp = new_timestamp + + +@dataclasses.dataclass +class LocalPlayer(Player): + """A player whose decide function is called directly. See README.md old method.""" + ai: LocalPlayerAi + + _ai_seekers: dict[str, seeker.Seeker] = dataclasses.field(init=False, default=None) + _ai_goals: list[goal.Goal] = dataclasses.field(init=False, default=None) + _ai_players: dict[str, Player] = dataclasses.field(init=False, default=None) + + def __post_init__(self): + self._logger = logging.getLogger(self.name) + + @property + def preferred_color(self) -> Color | None: + return self.ai.preferred_color + + def init_ai_state(self, goals: list[goal.Goal], players: dict[str, "Player"]): + self._ai_goals = [copy.deepcopy(goal_) for goal_ in goals] + + self._ai_players = {} + self._ai_seekers = {} + + for player in players.values(): + p = Player( + id=player.id, + name=player.name, + score=player.score, + seekers={}, + ) + p.color = copy.deepcopy(player.color) + p.preferred_color = copy.deepcopy(player.preferred_color) + p.camp = camp.Camp( + id=player.camp.id, + owner=p, + position=player.camp.position.copy(), + width=player.camp.width, + height=player.camp.height + ) + + self._ai_players[player.id] = p + + for seeker_ in player.seekers.values(): + s = copy.deepcopy(seeker_) + s.owner = p + + p.seekers[seeker_.id] = s + self._ai_seekers[seeker_.id] = s + + def update_ai_state(self, goals: list[goal.Goal], players: dict[str, "Player"]): + if self._ai_seekers is None: + self.init_ai_state(goals, players) + + for ai_goal, goal_ in zip(self._ai_goals, goals): + ai_goal.position = goal_.position.copy() + ai_goal.velocity = goal_.velocity.copy() + ai_goal.owner = self._ai_players[goal_.owner.id] if goal_.owner else None + ai_goal.time_owned = goal_.time_owned + + for player in players.values(): + for seeker_id, seeker_ in player.seekers.items(): + ai_seeker = self._ai_seekers[seeker_id] + + ai_seeker.position = seeker_.position.copy() + ai_seeker.velocity = seeker_.velocity.copy() + ai_seeker.target = seeker_.target.copy() + ai_seeker.disabled_counter = seeker_.disabled_counter + ai_seeker.magnet.strength = seeker_.magnet.strength + + def get_ai_input( + self, + world_: world.World, + goals: list[goal.Goal], + players: dict[str, Player], + time: float + ) -> AiInput: + self.update_ai_state(goals, players) + + me = self._ai_players[self.id] + my_camp = me.camp + my_seekers = list(me.seekers.values()) + other_seekers = [s for p in self._ai_players.values() for s in p.seekers.values() if p is not me] + all_seekers = my_seekers + other_seekers + camps = [p.camp for p in self._ai_players.values()] + + return ( + my_seekers, + other_seekers, + all_seekers, + self._ai_goals.copy(), + [player for player in self._ai_players.values() if player is not me], + my_camp, camps, + world.World(world_.width, world_.height), + time + ) + + def call_ai(self, ai_input: AiInput, debug: bool) -> typing.Any: + def call(): + new_debug_drawings = [] + + if debug: + from seekers.debug_drawing import add_debug_drawing_func_ctxtvar + add_debug_drawing_func_ctxtvar.set(new_debug_drawings.append) + + ai_out = self.ai.decide_function(*ai_input) + + self.debug_drawings = new_debug_drawings + + return ai_out + + try: + # only check for an updated file every 10 game ticks + *_, passed_playtime = ai_input + if int(passed_playtime) % 10 == 0: + self.ai.update() + + return call() + except Exception as e: + raise InvalidAiOutputError(f"AI {self.ai.filepath!r} raised an exception") from e + + def process_ai_output(self, ai_output: typing.Any): + if not isinstance(ai_output, list): + raise InvalidAiOutputError(f"AI output must be a list, not {type(ai_output)!r}.") + + if len(ai_output) != len(self.seekers): + raise InvalidAiOutputError(f"AI output length must be {len(self.seekers)}, not {len(ai_output)}.") + + for ai_seeker in ai_output: + try: + own_seeker = self.seekers[ai_seeker.id] + except IndexError as e: + raise InvalidAiOutputError( + f"AI output contains a seeker with id {ai_seeker.id!r} which is not one of the player's seekers." + ) from e + + if not isinstance(ai_seeker, seeker.Seeker): + raise InvalidAiOutputError(f"AI output must be a list of Seekers, not {type(ai_seeker)!r}.") + + if not isinstance(ai_seeker.target, Vector): + raise InvalidAiOutputError( + f"AI output Seeker target must be a Vector, not {type(ai_seeker.target)!r}.") + + if not isinstance(ai_seeker.magnet, seeker.Magnet): + raise InvalidAiOutputError( + f"AI output Seeker magnet must be a Magnet, not {type(ai_seeker.magnet)!r}.") + + try: + own_seeker.target.x = float(ai_seeker.target.x) + own_seeker.target.y = float(ai_seeker.target.y) + except ValueError as e: + raise InvalidAiOutputError( + f"AI output Seeker target Vector components must be numbers, not {ai_seeker.target!r}." + ) from e + + try: + own_seeker.magnet.strength = float(ai_seeker.magnet.strength) + except ValueError as e: + raise InvalidAiOutputError( + f"AI output Seeker magnet strength must be a float, not {ai_seeker.magnet.strength!r}." + ) from e + + def poll_ai(self, wait: bool, world_: world.World, goals: list[goal.Goal], players: dict[str, Player], + time_: float, debug: bool): + # ignore wait flag, supporting it would be a lot of extra code, instead always wait (blocking) + + ai_input = self.get_ai_input(world_, goals, players, time_) + + try: + ai_output = self.call_ai(ai_input, debug) + + self.process_ai_output(ai_output) + except InvalidAiOutputError as e: + self._logger.error(f"AI {self.ai.filepath!r} output is invalid.", exc_info=e) + + @classmethod + def from_file(cls, filepath: str) -> "LocalPlayer": + name, _ = os.path.splitext(filepath) + + return LocalPlayer( + id=get_id("Player"), + name=name, + score=0, + seekers={}, + ai=LocalPlayerAi.from_file(filepath) + ) + + +class GrpcClientPlayer(Player): + """A player whose decide function is called via a gRPC server and client. See README.md new method.""" + + def __init__(self, token: str, *args, preferred_color: Color | None = None, **kwargs): + super().__init__(*args, **kwargs) + self.was_updated = threading.Event() + self.num_updates = 0 + self.preferred_color = preferred_color + self.token = token + + def wait_for_update(self): + timeout = 5 # seconds + + was_updated = self.was_updated.wait(timeout) + + if not was_updated: + raise TimeoutError( + f"GrpcClientPlayer {self.name!r} did not update in time. (Timeout is {timeout} seconds.)" + ) + + self.was_updated.clear() + + def poll_ai(self, wait: bool, world_: world.World, goals: list[goal.Goal], players: dict[str, Player], + time_: float, debug: bool): + if wait: + self.wait_for_update() diff --git a/seekers/seeker.py b/seekers/seeker.py new file mode 100644 index 0000000..04165f3 --- /dev/null +++ b/seekers/seeker.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import math + +from .config import * +from .vector import * +from . import ( + physical, + world, +) + +__all__ = [ + "Magnet", + "Seeker", +] + + +class Magnet: + def __init__(self, strength=0): + self.strength = strength + + @property + def strength(self): + return self._strength + + @strength.setter + def strength(self, value): + if 1 >= value >= -8: + self._strength = value + else: + raise ValueError("Magnet strength must be between -8 and 1.") + + def is_on(self): + return self.strength != 0 + + def set_repulsive(self): + self.strength = -8 + + def set_attractive(self): + self.strength = 1 + + def disable(self): + self.strength = 0 + + +class Seeker(physical.Physical): + def __init__(self, owner, disabled_time: float, magnet_slowdown: float, base_thrust: float, *args, + **kwargs): + physical.Physical.__init__(self, *args, **kwargs) + + self.target = self.position.copy() + self.disabled_counter = 0 + self.magnet = Magnet() + + self.owner = owner + self.disabled_time = disabled_time + self.magnet_slowdown = magnet_slowdown + self.base_thrust = base_thrust + + @classmethod + def from_config(cls, owner, id_: str, position: Vector, config: Config): + return cls( + owner=owner, + disabled_time=config.seeker_disabled_time, + magnet_slowdown=config.seeker_magnet_slowdown, + base_thrust=config.seeker_thrust, + id_=id_, + position=position, + velocity=Vector(), + mass=config.seeker_mass, + radius=config.seeker_radius, + friction=config.seeker_friction, + ) + + def thrust(self) -> float: + magnet_slowdown_factor = self.magnet_slowdown if self.magnet.is_on() else 1 + + return self.base_thrust * magnet_slowdown_factor + + @property + def is_disabled(self): + return self.disabled_counter > 0 + + def disable(self): + self.disabled_counter = self.disabled_time + + def disabled(self): + return self.is_disabled + + def magnetic_force(self, world_: world.World, pos: Vector) -> Vector: + def bump(difference) -> float: + return math.exp(1 / (difference ** 2 - 1)) if difference < 1 else 0 + + torus_diff = world_.torus_difference(self.position, pos) + torus_diff_len = torus_diff.length() + + r = torus_diff_len / world_.diameter() + direction = (torus_diff / torus_diff_len) if torus_diff_len != 0 else Vector(0, 0) + + if self.is_disabled: + return Vector(0, 0) + + return - direction * (self.magnet.strength * bump(r * 10)) + + def update_acceleration(self, world_: world.World): + if self.disabled_counter == 0: + self.acceleration = world_.torus_direction(self.position, self.target) + else: + self.acceleration = Vector(0, 0) + + def magnet_effective(self): + """Return whether the magnet is on and the seeker is not disabled.""" + return self.magnet.is_on() and not self.is_disabled + + def collision(self, other: "Seeker", world_: world.World): + if not (self.magnet_effective() or other.magnet_effective()): + self.disable() + other.disable() + + if self.magnet_effective(): + self.disable() + if other.magnet_effective(): + other.disable() + + physical.Physical.collision(self, other, world_) + + # methods below are left in for compatibility + def set_magnet_repulsive(self): + self.magnet.set_repulsive() + + def set_magnet_attractive(self): + self.magnet.set_attractive() + + def disable_magnet(self): + self.magnet.disable() + + def set_magnet_disabled(self): + self.magnet.disable() + + @property + def max_speed(self): + return self.base_thrust / self.friction diff --git a/seekers/seekers_types.py b/seekers/seekers_types.py deleted file mode 100644 index e670ca4..0000000 --- a/seekers/seekers_types.py +++ /dev/null @@ -1,904 +0,0 @@ -from __future__ import annotations - -import copy -import logging -import os -import textwrap -import threading -import configparser -import math -import dataclasses -import abc -import random -import typing -from collections import defaultdict - -from .colors import Color - -__all__ = [ - "get_id", - "Config", - "Vector", - "Physical", - "Goal", - "Magnet", - "Seeker", - "AiInput", - "DecideCallable", - "Player", - "InvalidAiOutputError", - "LocalPlayerAi", - "LocalPlayer", - "GrpcClientPlayer", - "World", - "Camp", -] - -_IDS = defaultdict(list) - - -def get_id(obj: str): - rng = random.Random(obj) - - while (id_ := rng.randint(0, 2 ** 32)) in _IDS[obj]: - ... - - _IDS[obj].append(id_) - - return f"py-seekers.{obj}@{id_}" - - -@dataclasses.dataclass -class Config: - """Configuration for the Seekers game.""" - global_wait_for_players: bool - global_playtime: int - global_seed: int - global_fps: int - global_speed: int - global_players: int - global_seekers: int - global_goals: int - global_color_threshold: float - - map_width: int - map_height: int - - camp_width: int - camp_height: int - - seeker_thrust: float - seeker_magnet_slowdown: float - seeker_disabled_time: int - seeker_radius: float - seeker_mass: float - seeker_friction: float - - goal_scoring_time: int - goal_radius: float - goal_mass: float - goal_thrust: float - goal_friction: float - - @property - def map_dimensions(self): - return self.map_width, self.map_height - - @classmethod - def from_file(cls, file) -> "Config": - cp = configparser.ConfigParser() - cp.read_file(file) - - return cls( - global_wait_for_players=cp.getboolean("global", "wait-for-players"), - global_playtime=cp.getint("global", "playtime"), - global_seed=cp.getint("global", "seed"), - global_fps=cp.getint("global", "fps"), - global_speed=cp.getint("global", "speed"), - global_players=cp.getint("global", "players"), - global_seekers=cp.getint("global", "seekers"), - global_goals=cp.getint("global", "goals"), - global_color_threshold=cp.getfloat("global", "color-threshold"), - - map_width=cp.getint("map", "width"), - map_height=cp.getint("map", "height"), - - camp_width=cp.getint("camp", "width"), - camp_height=cp.getint("camp", "height"), - - seeker_thrust=cp.getfloat("seeker", "thrust"), - seeker_magnet_slowdown=cp.getfloat("seeker", "magnet-slowdown"), - seeker_disabled_time=cp.getint("seeker", "disabled-time"), - seeker_radius=cp.getfloat("seeker", "radius"), - seeker_mass=cp.getfloat("seeker", "mass"), - seeker_friction=cp.getfloat("seeker", "friction"), - - goal_scoring_time=cp.getint("goal", "scoring-time"), - goal_radius=cp.getfloat("goal", "radius"), - goal_mass=cp.getfloat("goal", "mass"), - goal_thrust=cp.getfloat("goal", "thrust"), - goal_friction=cp.getfloat("goal", "friction"), - ) - - @classmethod - def from_filepath(cls, filepath: str) -> "Config": - with open(filepath) as f: - return cls.from_file(f) - - @staticmethod - def value_to_str(value: bool | float | int | str) -> str: - if isinstance(value, bool): - return str(value).lower() - elif isinstance(value, float): - return f"{value:.2f}" - else: - return str(value) - - @staticmethod - def value_from_str(value: str, type_: typing.Literal["bool", "float", "int", "str"]) -> bool | float | int | str: - if type_ == "bool": - return value.lower() == "true" - elif type_ == "float": - return float(value) - elif type_ == "int": - return int(float(value)) - else: - return value - - @staticmethod - def get_section_and_key(attribute_name: str) -> tuple[str, str]: - """Split an attribute name into the config header name and the key name.""" - - section, key = attribute_name.split("_", 1) - - return section, key.replace("_", "-") - - @staticmethod - def get_attribute_name(section: str, key: str) -> str: - return f"{section}_{key.replace('-', '_')}" - - @classmethod - def get_field_type(cls, field_name: str) -> typing.Literal["bool", "float", "int", "str"]: - field_types = {f.name: f.type for f in dataclasses.fields(cls)} - return field_types[field_name] - - def import_option(self, section: str, key: str, value: str): - field_name = self.get_attribute_name(section, key) - field_type = self.get_field_type(field_name) - - setattr(self, field_name, self.value_from_str(value, field_type)) - - -class Vector: - __slots__ = ("x", "y") - - def __init__(self, x: float = 0, y: float = 0): - self.x = x - self.y = y - - @staticmethod - def from_polar(angle: float, radius: float = 1) -> "Vector": - return Vector(math.cos(angle) * radius, math.sin(angle) * radius) - - def rotated(self, angle: float) -> "Vector": - return Vector( - math.cos(angle) * self.x - math.sin(angle) * self.y, - math.sin(angle) * self.x + math.cos(angle) * self.y, - ) - - def rotated90(self) -> "Vector": - return Vector(-self.y, self.x) - - def __iter__(self): - return iter((self.x, self.y)) - - def __getitem__(self, i: int): - if i == 0: - return self.x - elif i == 1: - return self.y - - raise IndexError - - def __add__(self, other: "Vector"): - return Vector(self.x + other.x, self.y + other.y) - - def __sub__(self, other: "Vector"): - return Vector(self.x - other.x, self.y - other.y) - - def __mul__(self, factor: float): - return factor * self - - def __rmul__(self, factor: float): - if isinstance(factor, Vector): - return NotImplemented - else: - return Vector(factor * self.x, factor * self.y) - - def __truediv__(self, divisor: float): - if isinstance(divisor, Vector): - return NotImplemented - else: - return Vector(self.x / divisor, self.y / divisor) - - def __rtruediv__(self, dividend: float): - if isinstance(dividend, Vector): - return NotImplemented - else: - return Vector(dividend / self.x, dividend / self.y) - - def __neg__(self): - return -1 * self - - def __bool__(self): - return self.x or self.y - - def dot(self, other: "Vector") -> float: - return self.x * other.x + self.y * other.y - - def squared_length(self) -> float: - return self.x * self.x + self.y * self.y - - def length(self) -> float: - return math.sqrt(self.x * self.x + self.y * self.y) - - def norm(self): - return self.length() - - def normalized(self): - norm = self.length() - if norm == 0: - return Vector(0, 0) - else: - return Vector(self.x / norm, self.y / norm) - - def map(self, func: typing.Callable[[float], float]) -> "Vector": - return Vector(func(self.x), func(self.y)) - - def copy(self) -> "Vector": - return Vector(self.x, self.y) - - def __repr__(self): - return f"Vector({self.x}, {self.y})" - - def __format__(self, format_spec): - return f"Vector({self.x:{format_spec}}, {self.y:{format_spec}})" - - -class Physical: - def __init__(self, id_: str, position: Vector, velocity: Vector, - mass: float, radius: float, friction: float): - self.id = id_ - - self.position = position - self.velocity = velocity - self.acceleration = Vector(0, 0) - - self.mass = mass - self.radius = radius - - self.friction = friction - - def update_acceleration(self, world: "World"): - """Update self.acceleration. Ideally, that is a unit vector. This is supposed to be overridden by subclasses.""" - pass - - def thrust(self) -> float: - """Return the thrust, i.e. length of applied acceleration. This is supposed to be overridden by subclasses.""" - return 1 - - def move(self, world: "World"): - # friction - self.velocity *= 1 - self.friction - - # acceleration - self.update_acceleration(world) - self.velocity += self.acceleration * self.thrust() - - # displacement - self.position += self.velocity - - world.normalize_position(self.position) - - def collision(self, other: "Physical", world: "World"): - # elastic collision - min_dist = self.radius + other.radius - - d = world.torus_difference(self.position, other.position) - - dn = d.normalized() - dv = other.velocity - self.velocity - m = 2 / (self.mass + other.mass) - - dvdn = dv.dot(dn) - if dvdn < 0: - self.velocity += dn * (m * other.mass * dvdn) - other.velocity -= dn * (m * self.mass * dvdn) - - ddn = d.dot(dn) - if ddn < min_dist: - self.position += dn * (ddn - min_dist) - other.position -= dn * (ddn - min_dist) - - -class Goal(Physical): - def __init__(self, scoring_time: float, base_thrust: float, *args, **kwargs): - Physical.__init__(self, *args, **kwargs) - - self.owner: "Player | None" = None - self.time_owned: int = 0 - - self.scoring_time = scoring_time - self.base_thrust = base_thrust - - def thrust(self) -> float: - return self.base_thrust - - @classmethod - def from_config(cls, id_: str, position: Vector, config: Config) -> Goal: - return cls( - scoring_time=config.goal_scoring_time, - base_thrust=config.goal_thrust, - id_=id_, - position=position, - velocity=Vector(0, 0), - mass=config.goal_mass, - radius=config.goal_radius, - friction=config.goal_friction - ) - - def camp_tick(self, camp: "Camp") -> bool: - """Update the goal and return True if it has been captured.""" - if camp.contains(self.position): - if self.owner == camp.owner: - self.time_owned += 1 - else: - self.time_owned = 0 - self.owner = camp.owner - - return self.time_owned >= self.scoring_time - - -class Magnet: - def __init__(self, strength=0): - self.strength = strength - - @property - def strength(self): - return self._strength - - @strength.setter - def strength(self, value): - if 1 >= value >= -8: - self._strength = value - else: - raise ValueError("Magnet strength must be between -8 and 1.") - - def is_on(self): - return self.strength != 0 - - def set_repulsive(self): - self.strength = -8 - - def set_attractive(self): - self.strength = 1 - - def disable(self): - self.strength = 0 - - -class Seeker(Physical): - def __init__(self, owner: Player, disabled_time: float, magnet_slowdown: float, base_thrust: float, *args, - **kwargs): - Physical.__init__(self, *args, **kwargs) - - self.target = self.position.copy() - self.disabled_counter = 0 - self.magnet = Magnet() - - self.owner = owner - self.disabled_time = disabled_time - self.magnet_slowdown = magnet_slowdown - self.base_thrust = base_thrust - - @classmethod - def from_config(cls, owner: Player, id_: str, position: Vector, config: Config): - return cls( - owner=owner, - disabled_time=config.seeker_disabled_time, - magnet_slowdown=config.seeker_magnet_slowdown, - base_thrust=config.seeker_thrust, - id_=id_, - position=position, - velocity=Vector(), - mass=config.seeker_mass, - radius=config.seeker_radius, - friction=config.seeker_friction, - ) - - def thrust(self) -> float: - magnet_slowdown_factor = self.magnet_slowdown if self.magnet.is_on() else 1 - - return self.base_thrust * magnet_slowdown_factor - - @property - def is_disabled(self): - return self.disabled_counter > 0 - - def disable(self): - self.disabled_counter = self.disabled_time - - def disabled(self): - return self.is_disabled - - def magnetic_force(self, world: World, pos: Vector) -> Vector: - def bump(r) -> float: - return math.exp(1 / (r ** 2 - 1)) if r < 1 else 0 - - torus_diff = world.torus_difference(self.position, pos) - torus_diff_len = torus_diff.length() - - r = torus_diff_len / world.diameter() - direction = (torus_diff / torus_diff_len) if torus_diff_len != 0 else Vector(0, 0) - - if self.is_disabled: - return Vector(0, 0) - - return - direction * (self.magnet.strength * bump(r * 10)) - - def update_acceleration(self, world: World): - if self.disabled_counter == 0: - self.acceleration = world.torus_direction(self.position, self.target) - else: - self.acceleration = Vector(0, 0) - - def magnet_effective(self): - """Return whether the magnet is on and the seeker is not disabled.""" - return self.magnet.is_on() and not self.is_disabled - - def collision(self, other: "Seeker", world: World): - if not (self.magnet_effective() or other.magnet_effective()): - self.disable() - other.disable() - - if self.magnet_effective(): - self.disable() - if other.magnet_effective(): - other.disable() - - Physical.collision(self, other, world) - - # methods below are left in for compatibility - def set_magnet_repulsive(self): - self.magnet.set_repulsive() - - def set_magnet_attractive(self): - self.magnet.set_attractive() - - def disable_magnet(self): - self.magnet.disable() - - def set_magnet_disabled(self): - self.magnet.disable() - - @property - def max_speed(self): - return self.base_thrust / self.friction - - -AiInput = tuple[ - list[Seeker], list[Seeker], list[Seeker], list[Goal], list["Player"], "Camp", list[ - "Camp"], "World", float -] -DecideCallable = typing.Callable[ - [list[Seeker], list[Seeker], list[Seeker], list[Goal], list["Player"], "Camp", - list["Camp"], "World", - float], - list[Seeker] - # my seekers other seekers all seekers goals other_players my camp camps world time - # new my seekers -] - - -@dataclasses.dataclass -class Player: - id: str - name: str - score: int - seekers: dict[str, Seeker] - - color: Color | None = dataclasses.field(init=False, default=None) - camp: typing.Union["Camp", None] = dataclasses.field(init=False, default=None) - debug_drawings: list = dataclasses.field(init=False, default_factory=list) - preferred_color: Color | None = dataclasses.field(init=False, default=None) - - @abc.abstractmethod - def poll_ai(self, wait: bool, world: "World", goals: list[Goal], - players: dict[str, "Player"], time_: float, debug: bool): - ... - - -class InvalidAiOutputError(Exception): ... - - -@dataclasses.dataclass -class LocalPlayerAi: - filepath: str - timestamp: float - decide_function: DecideCallable - preferred_color: Color | None = None - - @staticmethod - def load_module(filepath: str) -> tuple[DecideCallable, Color | None]: - try: - with open(filepath) as f: - code = f.read() - - if code.strip().startswith("#bot"): - logging.info(f"AI {filepath!r} was loaded in compatibility mode. (#bot)") - # Wrap code inside a decide function (compatibility). - # The old function that did this was called 'mogrify'. - - func_header = ( - "def decide(seekers, other_seekers, all_seekers, goals, otherPlayers, own_camp, camps, world, " - "passed_time):" - ) - - fist_line, code = code.split("\n", 1) - - code = func_header + fist_line + ";\n" + textwrap.indent(code + "\nreturn seekers", " ") - - mod = compile("".join(code), filepath, "exec") - - mod_dict = {} - exec(mod, mod_dict) - - preferred_color = mod_dict.get("__color__", None) - if preferred_color is not None: - if not (isinstance(preferred_color, tuple) or isinstance(preferred_color, list)): - raise TypeError(f"__color__ must be a tuple or list, not {type(preferred_color)!r}.") - - if len(preferred_color) != 3: - raise ValueError(f"__color__ must be a tuple or list of length 3, not {len(preferred_color)}.") - - if "decide" not in mod_dict: - raise KeyError(f"AI {filepath!r} does not have a 'decide' function.") - - return mod_dict["decide"], preferred_color - except Exception as e: - # print(f"Error while loading AI {filepath!r}", file=sys.stderr) - # traceback.print_exc(file=sys.stderr) - # print(file=sys.stderr) - - raise InvalidAiOutputError(f"Error while loading AI {filepath!r}. Dummy AIs are not supported.") from e - - @classmethod - def from_file(cls, filepath: str) -> "LocalPlayerAi": - decide_func, preferred_color = cls.load_module(filepath) - - return cls(filepath, os.path.getctime(filepath), decide_func, preferred_color) - - def update(self): - new_timestamp = os.path.getctime(self.filepath) - if new_timestamp > self.timestamp: - logger = logging.getLogger("AiReloader") - logger.debug(f"Reloading AI {self.filepath!r}.") - - self.decide_function, self.preferred_color = self.load_module(self.filepath) - self.timestamp = new_timestamp - - -@dataclasses.dataclass -class LocalPlayer(Player): - """A player whose decide function is called directly. See README.md old method.""" - ai: LocalPlayerAi - - _ai_seekers: dict[str, Seeker] = dataclasses.field(init=False, default=None) - _ai_goals: list[Goal] = dataclasses.field(init=False, default=None) - _ai_players: dict[str, "Player"] = dataclasses.field(init=False, default=None) - - def __post_init__(self): - self._logger = logging.getLogger(self.name) - - @property - def preferred_color(self) -> Color | None: - return self.ai.preferred_color - - def init_ai_state(self, goals: list[Goal], players: dict[str, "Player"]): - self._ai_goals = [copy.deepcopy(goal) for goal in goals] - - self._ai_players = {} - self._ai_seekers = {} - - for player in players.values(): - p = Player( - id=player.id, - name=player.name, - score=player.score, - seekers={}, - ) - p.color = copy.deepcopy(player.color) - p.preferred_color = copy.deepcopy(player.preferred_color) - p.camp = Camp( - id=player.camp.id, - owner=p, - position=player.camp.position.copy(), - width=player.camp.width, - height=player.camp.height - ) - - self._ai_players[player.id] = p - - for seeker in player.seekers.values(): - s = copy.deepcopy(seeker) - s.owner = p - - p.seekers[seeker.id] = s - self._ai_seekers[seeker.id] = s - - def update_ai_state(self, goals: list[Goal], players: dict[str, "Player"]): - if self._ai_seekers is None: - self.init_ai_state(goals, players) - - for ai_goal, goal in zip(self._ai_goals, goals): - ai_goal.position = goal.position.copy() - ai_goal.velocity = goal.velocity.copy() - ai_goal.owner = self._ai_players[goal.owner.id] if goal.owner else None - ai_goal.time_owned = goal.time_owned - - for player in players.values(): - for seeker_id, seeker in player.seekers.items(): - ai_seeker = self._ai_seekers[seeker_id] - - ai_seeker.position = seeker.position.copy() - ai_seeker.velocity = seeker.velocity.copy() - ai_seeker.target = seeker.target.copy() - ai_seeker.disabled_counter = seeker.disabled_counter - ai_seeker.magnet.strength = seeker.magnet.strength - - def get_ai_input(self, - world: "World", - goals: list[Goal], - players: dict[str, "Player"], - time: float - ) -> AiInput: - self.update_ai_state(goals, players) - - me = self._ai_players[self.id] - my_camp = me.camp - my_seekers = list(me.seekers.values()) - other_seekers = [s for p in self._ai_players.values() for s in p.seekers.values() if p is not me] - all_seekers = my_seekers + other_seekers - camps = [p.camp for p in self._ai_players.values()] - - return ( - my_seekers, - other_seekers, - all_seekers, - self._ai_goals.copy(), - [player for player in self._ai_players.values() if player is not me], - my_camp, camps, - World(world.width, world.height), - time - ) - - def call_ai(self, ai_input: AiInput, debug: bool) -> typing.Any: - def call(): - new_debug_drawings = [] - - if debug: - from .debug_drawing import add_debug_drawing_func_ctxtvar - add_debug_drawing_func_ctxtvar.set(new_debug_drawings.append) - - ai_out = self.ai.decide_function(*ai_input) - - self.debug_drawings = new_debug_drawings - - return ai_out - - try: - # only check for an updated file every 10 game ticks - *_, passed_playtime = ai_input - if int(passed_playtime) % 10 == 0: - self.ai.update() - - return call() - except Exception as e: - raise InvalidAiOutputError(f"AI {self.ai.filepath!r} raised an exception") from e - - def process_ai_output(self, ai_output: typing.Any): - if not isinstance(ai_output, list): - raise InvalidAiOutputError(f"AI output must be a list, not {type(ai_output)!r}.") - - if len(ai_output) != len(self.seekers): - raise InvalidAiOutputError(f"AI output length must be {len(self.seekers)}, not {len(ai_output)}.") - - for ai_seeker in ai_output: - try: - own_seeker = self.seekers[ai_seeker.id] - except IndexError as e: - raise InvalidAiOutputError( - f"AI output contains a seeker with id {ai_seeker.id!r} which is not one of the player's seekers." - ) from e - - if not isinstance(ai_seeker, Seeker): - raise InvalidAiOutputError(f"AI output must be a list of Seekers, not {type(ai_seeker)!r}.") - - if not isinstance(ai_seeker.target, Vector): - raise InvalidAiOutputError( - f"AI output Seeker target must be a Vector, not {type(ai_seeker.target)!r}.") - - if not isinstance(ai_seeker.magnet, Magnet): - raise InvalidAiOutputError( - f"AI output Seeker magnet must be a Magnet, not {type(ai_seeker.magnet)!r}.") - - try: - own_seeker.target.x = float(ai_seeker.target.x) - own_seeker.target.y = float(ai_seeker.target.y) - except ValueError as e: - raise InvalidAiOutputError( - f"AI output Seeker target Vector components must be numbers, not {ai_seeker.target!r}." - ) from e - - try: - own_seeker.magnet.strength = float(ai_seeker.magnet.strength) - except ValueError as e: - raise InvalidAiOutputError( - f"AI output Seeker magnet strength must be a float, not {ai_seeker.magnet.strength!r}." - ) from e - - def poll_ai(self, wait: bool, world: "World", goals: list[Goal], players: dict[str, "Player"], - time_: float, debug: bool): - # ignore wait flag, supporting it would be a lot of extra code, instead always wait (blocking) - - ai_input = self.get_ai_input(world, goals, players, time_) - - try: - ai_output = self.call_ai(ai_input, debug) - - self.process_ai_output(ai_output) - except InvalidAiOutputError as e: - self._logger.error(f"AI {self.ai.filepath!r} output is invalid.", exc_info=e) - - @classmethod - def from_file(cls, filepath: str) -> "LocalPlayer": - name, _ = os.path.splitext(filepath) - - return LocalPlayer( - id=get_id("Player"), - name=name, - score=0, - seekers={}, - ai=LocalPlayerAi.from_file(filepath) - ) - - -class GrpcClientPlayer(Player): - """A player whose decide function is called via a gRPC server and client. See README.md new method.""" - - def __init__(self, token: str, *args, preferred_color: Color | None = None, **kwargs): - super().__init__(*args, **kwargs) - self.was_updated = threading.Event() - self.num_updates = 0 - self.preferred_color = preferred_color - self.token = token - - def wait_for_update(self): - timeout = 5 # seconds - - was_updated = self.was_updated.wait(timeout) - - if not was_updated: - raise TimeoutError( - f"GrpcClientPlayer {self.name!r} did not update in time. (Timeout is {timeout} seconds.)" - ) - - self.was_updated.clear() - - def poll_ai(self, wait: bool, world: "World", goals: list[Goal], players: dict[str, "Player"], - time_: float, debug: bool): - if wait: - self.wait_for_update() - - -class World: - """The world in which the game takes place. This class mainly handles the torus geometry.""" - - def __init__(self, width, height): - self.width = width - self.height = height - - def normalize_position(self, pos: Vector): - pos.x -= math.floor(pos.x / self.width) * self.width - pos.y -= math.floor(pos.y / self.height) * self.height - - def normalized_position(self, pos: Vector): - tmp = pos.copy() - self.normalize_position(tmp) - return tmp - - @property - def geometry(self) -> Vector: - return Vector(self.width, self.height) - - def diameter(self) -> float: - return self.geometry.length() - - def middle(self) -> Vector: - return self.geometry / 2 - - def torus_difference(self, left: Vector, right: Vector, /) -> Vector: - def diff1d(l, a, b): - delta = abs(a - b) - return b - a if delta < l - delta else a - b - - return Vector(diff1d(self.width, left.x, right.x), - diff1d(self.height, left.y, right.y)) - - def torus_distance(self, left: Vector, right: Vector, /) -> float: - return self.torus_difference(left, right).length() - - def torus_direction(self, left: Vector, right: Vector, /) -> Vector: - return self.torus_difference(left, right).normalized() - - def index_of_nearest(self, pos: Vector, positions: list) -> int: - d = self.torus_distance(pos, positions[0]) - j = 0 - for i, p in enumerate(positions[1:]): - dn = self.torus_distance(pos, p) - if dn < d: - d = dn - j = i + 1 - return j - - def nearest_goal(self, pos: Vector, goals: list) -> Goal: - i = self.index_of_nearest(pos, [g.position for g in goals]) - return goals[i] - - def nearest_seeker(self, pos: Vector, seekers: list) -> Seeker: - i = self.index_of_nearest(pos, [s.position for s in seekers]) - return seekers[i] - - def random_position(self) -> Vector: - return Vector(random.uniform(0, self.width), - random.uniform(0, self.height)) - - def generate_camps(self, players: typing.Collection[Player], config: Config) -> list["Camp"]: - delta = self.height / len(players) - - if config.camp_height > delta: - raise ValueError("Config value camp.height is too large. The camps would overlap. It must be smaller than " - "the height of the world divided by the number of players. ") - - for i, player in enumerate(players): - camp = Camp( - id=get_id("Camp"), - owner=player, - position=Vector(self.width / 2, delta * (i + 0.5)), - width=config.camp_width, - height=config.camp_height, - ) - player.camp = camp - - return [player.camp for player in players] - - -@dataclasses.dataclass -class Camp: - id: str - owner: Player - position: Vector - width: float - height: float - - def contains(self, pos: Vector) -> bool: - delta = self.position - pos - return 2 * abs(delta.x) < self.width and 2 * abs(delta.y) < self.height - - @property - def top_left(self) -> Vector: - return self.position - Vector(self.width, self.height) / 2 - - @property - def bottom_right(self) -> Vector: - return self.position + Vector(self.width, self.height) / 2 diff --git a/seekers/tests/__init__.py b/seekers/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/seekers/tests/test_seekers.py b/seekers/tests/test_seekers.py index 25c08ba..b42a4ab 100644 --- a/seekers/tests/test_seekers.py +++ b/seekers/tests/test_seekers.py @@ -3,7 +3,7 @@ import unittest from seekers import Config -from seekers.seekers_types import LocalPlayerAi +from seekers.player import LocalPlayerAi from seekers.game import SeekersGame from seekers.grpc.client import GrpcSeekersServiceWrapper, GrpcSeekersClient @@ -34,7 +34,7 @@ def test_speed_consistency(self): for speed in 1, 1, 10, 10, 20, 40: with self.subTest(msg=f"Speed: {speed}", speed=speed): - new_scores = nogrpc_game( + new_scores = no_grpc_game( playtime=2000, speed=speed, players=2, @@ -107,7 +107,7 @@ def grpc_game(playtime: int, speed: int, players: int, seed: int, filepaths: lis return {player.name: player.score for player in game.players.values()} -def nogrpc_game(playtime: int, speed: int, players: int, seed: int, filepaths: list[str]) -> dict[str, int]: +def no_grpc_game(playtime: int, speed: int, players: int, seed: int, filepaths: list[str]) -> dict[str, int]: config = Config.from_filepath("config.ini") config.global_fps = 1000 @@ -143,11 +143,11 @@ def test_grpc(self): address="localhost:7778" ) - def test_grpc_nogrpc_consistency(self): - """Test that the outcome of a game is the same for grpc and nogrpc.""" + def test_grpc_no_grpc_consistency(self): + """Test that the outcome of a game is the same for grpc and no grpc.""" for seed in 40, 41, 42, 43, 44, 45: with self.subTest(msg=f"Seed: {seed}", seed=seed): - nogrpc_scores = nogrpc_game( + no_grpc_scores = no_grpc_game( playtime=2000, speed=10, players=2, @@ -164,7 +164,7 @@ def test_grpc_nogrpc_consistency(self): address="localhost:7778" ) - self.assertEqual(grpc_scores, nogrpc_scores, + self.assertEqual(grpc_scores, no_grpc_scores, msg=f"Outcome of gRPC and non-gRPC games with seed {seed} is different.") diff --git a/seekers/vector.py b/seekers/vector.py new file mode 100644 index 0000000..09656ad --- /dev/null +++ b/seekers/vector.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import math +import typing + + +__all__ = [ + "Vector", +] + + +class Vector: + __slots__ = ("x", "y") + + def __init__(self, x: float = 0, y: float = 0): + self.x = x + self.y = y + + @staticmethod + def from_polar(angle: float, radius: float = 1) -> Vector: + return Vector(math.cos(angle) * radius, math.sin(angle) * radius) + + def rotated(self, angle: float) -> Vector: + return Vector( + math.cos(angle) * self.x - math.sin(angle) * self.y, + math.sin(angle) * self.x + math.cos(angle) * self.y, + ) + + def rotated90(self) -> Vector: + return Vector(-self.y, self.x) + + def __iter__(self): + return iter((self.x, self.y)) + + def __getitem__(self, i: int): + if i == 0: + return self.x + elif i == 1: + return self.y + + raise IndexError + + def __add__(self, other: Vector): + return Vector(self.x + other.x, self.y + other.y) + + def __sub__(self, other: Vector): + return Vector(self.x - other.x, self.y - other.y) + + def __mul__(self, factor: float): + return factor * self + + def __rmul__(self, factor: float): + if isinstance(factor, Vector): + return NotImplemented + else: + return Vector(factor * self.x, factor * self.y) + + def __truediv__(self, divisor: float): + if isinstance(divisor, Vector): + return NotImplemented + else: + return Vector(self.x / divisor, self.y / divisor) + + def __rtruediv__(self, dividend: float): + if isinstance(dividend, Vector): + return NotImplemented + else: + return Vector(dividend / self.x, dividend / self.y) + + def __neg__(self): + return -1 * self + + def __bool__(self): + return self.x or self.y + + def dot(self, other: Vector) -> float: + return self.x * other.x + self.y * other.y + + def squared_length(self) -> float: + return self.x * self.x + self.y * self.y + + def length(self) -> float: + return math.sqrt(self.x * self.x + self.y * self.y) + + def norm(self): + return self.length() + + def normalized(self): + norm = self.length() + if norm == 0: + return Vector(0, 0) + else: + return Vector(self.x / norm, self.y / norm) + + def map(self, func: typing.Callable[[float], float]) -> Vector: + return Vector(func(self.x), func(self.y)) + + def copy(self) -> Vector: + return Vector(self.x, self.y) + + def __repr__(self): + return f"Vector({self.x}, {self.y})" + + def __format__(self, format_spec): + return f"Vector({self.x:{format_spec}}, {self.y:{format_spec}})" diff --git a/seekers/world.py b/seekers/world.py new file mode 100644 index 0000000..49da3c5 --- /dev/null +++ b/seekers/world.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import math +import random +import typing + +from .vector import * +from .config import * +from .ids import * +from . import ( + camp +) + +__all__ = [ + "World", +] + + +class World: + """The world in which the game takes place. This class mainly handles the torus geometry.""" + + def __init__(self, width, height): + self.width = width + self.height = height + + def normalize_position(self, pos: Vector): + pos.x -= math.floor(pos.x / self.width) * self.width + pos.y -= math.floor(pos.y / self.height) * self.height + + def normalized_position(self, pos: Vector): + tmp = pos.copy() + self.normalize_position(tmp) + return tmp + + @property + def geometry(self) -> Vector: + return Vector(self.width, self.height) + + def diameter(self) -> float: + return self.geometry.length() + + def middle(self) -> Vector: + return self.geometry / 2 + + def torus_difference(self, left: Vector, right: Vector, /) -> Vector: + def diff1d(length, a, b): + delta = abs(a - b) + return b - a if delta < length - delta else a - b + + return Vector(diff1d(self.width, left.x, right.x), + diff1d(self.height, left.y, right.y)) + + def torus_distance(self, left: Vector, right: Vector, /) -> float: + return self.torus_difference(left, right).length() + + def torus_direction(self, left: Vector, right: Vector, /) -> Vector: + return self.torus_difference(left, right).normalized() + + def index_of_nearest(self, pos: Vector, positions: list) -> int: + d = self.torus_distance(pos, positions[0]) + j = 0 + for i, p in enumerate(positions[1:]): + dn = self.torus_distance(pos, p) + if dn < d: + d = dn + j = i + 1 + return j + + def nearest_goal(self, pos: Vector, goals: list): + i = self.index_of_nearest(pos, [g.position for g in goals]) + return goals[i] + + def nearest_seeker(self, pos: Vector, seekers: list): + i = self.index_of_nearest(pos, [s.position for s in seekers]) + return seekers[i] + + def random_position(self) -> Vector: + return Vector(random.uniform(0, self.width), + random.uniform(0, self.height)) + + def generate_camps(self, players: typing.Collection, config: Config) -> list["Camp"]: + delta = self.height / len(players) + + if config.camp_height > delta: + raise ValueError("Config value camp.height is too large. The camps would overlap. It must be smaller than " + "the height of the world divided by the number of players. ") + + for i, player in enumerate(players): + player.camp = camp.Camp( + id=get_id("Camp"), + owner=player, + position=Vector(self.width / 2, delta * (i + 0.5)), + width=config.camp_width, + height=config.camp_height, + ) + + return [player.camp for player in players]