Skip to content

Commit

Permalink
Merge pull request #68 from Puckoland/channels
Browse files Browse the repository at this point in the history
refactor: add Channels class
  • Loading branch information
strakam authored Oct 2, 2024
2 parents 0804acc + 40f70bc commit f26eb44
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 115 deletions.
67 changes: 67 additions & 0 deletions generals/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import numpy as np

from generals.config import MOUNTAIN, PASSABLE

valid_generals = ["A", "B"] # Generals are represented by A and B

class Channels:
"""
army - army size in each cell
general - general mask (1 if general is in cell, 0 otherwise)
mountain - mountain mask (1 if cell is mountain, 0 otherwise)
city - city mask (1 if cell is city, 0 otherwise)
passable - passable mask (1 if cell is passable, 0 otherwise)
ownership_i - ownership mask for player i (1 if player i owns cell, 0 otherwise)
ownership_neutral - ownership mask for neutral cells that are passable (1 if cell is neutral, 0 otherwise)
"""
def __init__(self, map: np.ndarray, _agents: list[str]):
self._army: np.ndarray = np.where(np.isin(map, valid_generals), 1, 0).astype(int)
self._general: np.ndarray = np.where(np.isin(map, valid_generals), 1, 0).astype(bool)
self._mountain: np.ndarray = np.where(map == MOUNTAIN, 1, 0).astype(bool)
self._city: np.ndarray = np.where(np.char.isdigit(map), 1, 0).astype(bool)
self._passable: np.ndarray = (map != MOUNTAIN).astype(bool)

self._ownership: dict[str, np.ndarray] = {
"neutral": ((map == PASSABLE) | (np.char.isdigit(map))).astype(bool)
}
for i, agent in enumerate(_agents):
self._ownership[agent] = np.where(map == chr(ord("A") + i), 1, 0).astype(bool)

# City costs are 40 + digit in the cell
city_costs = np.where(np.char.isdigit(map), map, "0").astype(int)
self.army += 40 * self.city + city_costs

@property
def ownership(self) -> dict[str, np.ndarray]:
return self._ownership

@property
def army(self) -> np.ndarray:
return self._army

@army.setter
def army(self, value):
self._army = value

@property
def general(self) -> np.ndarray:
return self._general

@property
def mountain(self) -> np.ndarray:
return self._mountain

@property
def city(self) -> np.ndarray:
return self._city

@property
def passable(self) -> np.ndarray:
return self._passable

@property
def ownership_neutral(self) -> np.ndarray:
return self._ownership["neutral"]

def _set_passable(self, value):
self._passable = value
99 changes: 35 additions & 64 deletions generals/game.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import warnings
from typing import Any

from .channels import Channels
from generals.grid import Grid
import numpy as np
import gymnasium as gym
Expand All @@ -27,37 +28,7 @@ def __init__(self, grid: Grid, agents: list[str]):
for i, agent in enumerate(self.agents)
}

# Initialize channels
# Army - army size in each cell
# General - general mask (1 if general is in cell, 0 otherwise)
# Mountain - mountain mask (1 if cell is mountain, 0 otherwise)
# City - city mask (1 if cell is city, 0 otherwise)
# Passable - passable mask (1 if cell is passable, 0 otherwise)
# Ownership_i - ownership mask for player i (1 if player i owns cell, 0 otherwise)
# Ownerhsip_0 - ownership mask for neutral cells that are passable (1 if cell is neutral, 0 otherwise)
# Initialize channels

valid_generals = ["A", "B"] # Generals are represented by A and B
self.channels = {
"army": np.where(np.isin(map, valid_generals), 1, 0).astype(int),
"general": np.where(np.isin(map, valid_generals), 1, 0).astype(bool),
"mountain": np.where(map == MOUNTAIN, 1, 0).astype(bool),
"city": np.where(np.char.isdigit(map), 1, 0).astype(bool),
"passable": (map != MOUNTAIN).astype(bool),
"ownership_neutral": ((map == PASSABLE) | (np.char.isdigit(map))).astype(
bool
),
**{
f"ownership_{agent}": np.where(map == chr(ord("A") + id), 1, 0).astype(
bool
)
for id, agent in enumerate(self.agents)
},
}

# City costs are 40 + digit in the cell
city_costs = np.where(np.char.isdigit(map), map, "0").astype(int)
self.channels["army"] += 40 * self.channels["city"] + city_costs
self.channels = Channels(map, self.agents)

##########
# Spaces #
Expand Down Expand Up @@ -115,8 +86,8 @@ def action_mask(self, agent: str) -> np.ndarray:
I.e. valid_action_mask[i, j, k] is 1 if action k is valid in cell (i, j).
"""

ownership_channel = self.channels[f"ownership_{agent}"]
more_than_1_army = (self.channels["army"] > 1) * ownership_channel
ownership_channel = self.channels.ownership[agent]
more_than_1_army = (self.channels.army > 1) * ownership_channel
owned_cells_indices = self.channel_to_indices(more_than_1_army)
valid_action_mask = np.zeros(
(self.grid_dims[0], self.grid_dims[1], 4), dtype=bool
Expand All @@ -138,7 +109,7 @@ def action_mask(self, agent: str) -> np.ndarray:

# check if destination is road
passable_cell_indices = (
self.channels["passable"][destinations[:, 0], destinations[:, 1]] == 1.0
self.channels.passable[destinations[:, 0], destinations[:, 1]] == 1
)
action_destinations = destinations[passable_cell_indices]

Expand Down Expand Up @@ -192,9 +163,9 @@ def step(self, actions: dict[str, Action]) -> tuple[dict[str, Observation], dict
)
continue
if split_army == 1: # Agent wants to split the army
army_to_move = self.channels["army"][i, j] // 2
army_to_move = self.channels.army[i, j] // 2
else: # Leave just one army in the source cell
army_to_move = self.channels["army"][i, j] - 1
army_to_move = self.channels.army[i, j] - 1
if army_to_move < 1: # Skip if army size to move is less than 1
continue
moves[agent] = (i, j, direction, army_to_move)
Expand All @@ -204,11 +175,11 @@ def step(self, actions: dict[str, Action]) -> tuple[dict[str, Observation], dict
si, sj, direction, army_to_move = moves[agent]

# Cap the amount of army to move (previous moves may have lowered available army)
army_to_move = min(army_to_move, self.channels["army"][si, sj] - 1)
army_to_stay = self.channels["army"][si, sj] - army_to_move
army_to_move = min(army_to_move, self.channels.army[si, sj] - 1)
army_to_stay = self.channels.army[si, sj] - army_to_move

# Check if the current agent still owns the source cell and has more than 1 army
if self.channels[f"ownership_{agent}"][si, sj] == 0 or army_to_move < 1:
if self.channels.ownership[agent][si, sj] == 0 or army_to_move < 1:
continue

di, dj = (
Expand All @@ -217,28 +188,28 @@ def step(self, actions: dict[str, Action]) -> tuple[dict[str, Observation], dict
) # destination indices

# Figure out the target square owner and army size
target_square_army = self.channels["army"][di, dj]
target_square_army = self.channels.army[di, dj]
target_square_owner_idx = np.argmax(
[
self.channels[f"ownership_{agent}"][di, dj]
self.channels.ownership[agent][di, dj]
for agent in ["neutral"] + self.agents
]
)
target_square_owner = (["neutral"] + self.agents)[target_square_owner_idx]
if target_square_owner == agent:
self.channels["army"][di, dj] += army_to_move
self.channels["army"][si, sj] = army_to_stay
self.channels.army[di, dj] += army_to_move
self.channels.army[si, sj] = army_to_stay
else:
# Calculate resulting army, winner and update channels
remaining_army = np.abs(target_square_army - army_to_move)
square_winner = (
agent if target_square_army < army_to_move else target_square_owner
)
self.channels["army"][di, dj] = remaining_army
self.channels["army"][si, sj] = army_to_stay
self.channels[f"ownership_{square_winner}"][di, dj] = 1
self.channels.army[di, dj] = remaining_army
self.channels.army[si, sj] = army_to_stay
self.channels.ownership[square_winner][di, dj] = 1
if square_winner != target_square_owner:
self.channels[f"ownership_{target_square_owner}"][di, dj] = 0
self.channels.ownership[target_square_owner][di, dj] = 0

if not done_before_actions:
self.time += 1
Expand All @@ -249,8 +220,8 @@ def step(self, actions: dict[str, Action]) -> tuple[dict[str, Observation], dict
self.agents[0] if self.agent_won(self.agents[0]) else self.agents[1]
)
loser = self.agents[1] if winner == self.agents[0] else self.agents[0]
self.channels[f"ownership_{winner}"] += self.channels[f"ownership_{loser}"]
self.channels[f"ownership_{loser}"] = self.channels["passable"] * 0
self.channels.ownership[winner] += self.channels.ownership[loser]
self.channels.ownership[loser] = self.channels.passable * 0
else:
self._global_game_update()

Expand All @@ -274,14 +245,14 @@ def _global_game_update(self) -> None:
# every TICK_RATE steps, increase army size in each cell
if self.time % INCREMENT_RATE == 0:
for owner in owners:
self.channels["army"] += self.channels[f"ownership_{owner}"]
self.channels.army += self.channels.ownership[owner]

# Increment armies on general and city cells, but only if they are owned by player
if self.time % 2 == 0 and self.time > 0:
update_mask = self.channels["general"] + self.channels["city"]
update_mask = self.channels.general + self.channels.city
for owner in owners:
self.channels["army"] += (
update_mask * self.channels[f"ownership_{owner}"]
self.channels.army += (
update_mask * self.channels.ownership[owner]
)

def is_done(self) -> bool:
Expand All @@ -300,9 +271,9 @@ def get_infos(self) -> dict[str, Info]:
players_stats = {}
for agent in self.agents:
army_size = np.sum(
self.channels["army"] * self.channels[f"ownership_{agent}"]
self.channels.army * self.channels.ownership[agent]
).astype(int)
land_size = np.sum(self.channels[f"ownership_{agent}"]).astype(int)
land_size = np.sum(self.channels.ownership[agent]).astype(int)
players_stats[agent] = {
"army": army_size,
"land": land_size,
Expand All @@ -318,16 +289,16 @@ def _agent_observation(self, agent: str) -> Observation:
"""
info = self.get_infos()
opponent = self.agents[0] if agent == self.agents[1] else self.agents[1]
visibility = self.visibility_channel(self.channels[f"ownership_{agent}"])
visibility = self.visibility_channel(self.channels.ownership[agent])
_observation = {
"army": self.channels["army"].astype(int) * visibility,
"general": self.channels["general"] * visibility,
"city": self.channels["city"] * visibility,
"owned_cells": self.channels[f"ownership_{agent}"] * visibility,
"opponent_cells": self.channels[f"ownership_{opponent}"] * visibility,
"neutral_cells": self.channels["ownership_neutral"] * visibility,
"army": self.channels.army.astype(int) * visibility,
"general": self.channels.general * visibility,
"city": self.channels.city * visibility,
"owned_cells": self.channels.ownership[agent] * visibility,
"opponent_cells": self.channels.ownership[opponent] * visibility,
"neutral_cells": self.channels.ownership_neutral * visibility,
"visible_cells": visibility,
"structure": self.channels["mountain"] + self.channels["city"],
"structure": self.channels.mountain + self.channels.city,
"owned_land_count": info[agent]["land"],
"owned_army_count": info[agent]["army"],
"opponent_land_count": info[opponent]["land"],
Expand All @@ -347,6 +318,6 @@ def agent_won(self, agent: str) -> bool:
Returns True if the agent won the game, False otherwise.
"""
return all(
self.channels[f"ownership_{agent}"][general[0], general[1]] == 1
self.channels.ownership[agent][general[0], general[1]] == 1
for general in self.general_positions.values()
)
18 changes: 9 additions & 9 deletions generals/gui/rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def render_grid(self):
owned_map = np.zeros((self.grid_height, self.grid_width), dtype=np.bool)
visible_map = np.zeros((self.grid_height, self.grid_width), dtype=np.bool)
for agent in agents:
ownership = self.game.channels["ownership_" + agent]
ownership = self.game.channels.ownership[agent]
owned_map = np.logical_or(owned_map, ownership)
if self.agent_fov[agent]:
visibility = self.game.visibility_channel(ownership)
Expand All @@ -182,12 +182,12 @@ def render_grid(self):

# Draw background of visible owned squares
for agent in agents:
ownership = self.game.channels["ownership_" + agent]
ownership = self.game.channels.ownership[agent]
visible_ownership = np.logical_and(ownership, visible_map)
self.draw_channel(visible_ownership, self.agent_data[agent]["color"])

# Draw visible generals
visible_generals = np.logical_and(self.game.channels["general"], visible_map)
visible_generals = np.logical_and(self.game.channels.general, visible_map)
self.draw_images(visible_generals, self._general_img)

# Draw background of visible but not owned squares
Expand All @@ -198,28 +198,28 @@ def render_grid(self):
self.draw_channel(invisible_map, c.FOG_OF_WAR)

# Draw background of visible mountains
visible_mountain = np.logical_and(self.game.channels["mountain"], visible_map)
visible_mountain = np.logical_and(self.game.channels.mountain, visible_map)
self.draw_channel(visible_mountain, c.VISIBLE_MOUNTAIN)

# Draw mountains (even if they are not visible)
self.draw_images(self.game.channels["mountain"], self._mountain_img)
self.draw_images(self.game.channels.mountain, self._mountain_img)

# Draw background of visible neutral cities
visible_cities = np.logical_and(self.game.channels["city"], visible_map)
visible_cities = np.logical_and(self.game.channels.city, visible_map)
visible_cities_neutral = np.logical_and(
visible_cities, self.game.channels["ownership_neutral"]
visible_cities, self.game.channels.ownership_neutral
)
self.draw_channel(visible_cities_neutral, c.NEUTRAL_CASTLE)

# Draw invisible cities as mountains
invisible_cities = np.logical_and(self.game.channels["city"], invisible_map)
invisible_cities = np.logical_and(self.game.channels.city, invisible_map)
self.draw_images(invisible_cities, self._mountain_img)

# Draw visible cities
self.draw_images(visible_cities, self._city_img)

# Draw nonzero army counts on visible squares
visible_army = self.game.channels["army"] * visible_map
visible_army = self.game.channels.army * visible_map
visible_army_indices = self.game.channel_to_indices(visible_army)
for i, j in visible_army_indices:
self.render_cell_text(
Expand Down
Loading

0 comments on commit f26eb44

Please sign in to comment.