-
Notifications
You must be signed in to change notification settings - Fork 0
/
matchmaking.py
347 lines (293 loc) · 16.1 KB
/
matchmaking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
"""Challenge other bots."""
import random
import logging
import model
from timer import Timer
from collections import defaultdict
import lichess
import datetime
from config import Configuration, FilterType
from typing import Dict, Any, Set, Optional, Tuple, List, DefaultDict
USER_PROFILE_TYPE = Dict[str, Any]
EVENT_TYPE = Dict[str, Any]
MULTIPROCESSING_LIST_TYPE = List[model.Challenge]
DAILY_TIMERS_TYPE = List[Timer]
logger = logging.getLogger(__name__)
daily_challenges_file_name = "daily_challenge_times.txt"
timestamp_format = "%Y-%m-%d %H:%M:%S\n"
one_day_seconds = datetime.timedelta(days=1).total_seconds()
def read_daily_challenges() -> DAILY_TIMERS_TYPE:
"""Read the challenges we have created in the past 24 hours from a text file."""
timers: DAILY_TIMERS_TYPE = []
try:
with open(daily_challenges_file_name) as file:
for line in file:
timers.append(Timer(one_day_seconds, datetime.datetime.strptime(line, timestamp_format)))
except FileNotFoundError:
pass
return [timer for timer in timers if not timer.is_expired()]
def write_daily_challenges(daily_challenges: DAILY_TIMERS_TYPE) -> None:
"""Write the challenges we have created in the past 24 hours to a text file."""
with open(daily_challenges_file_name, "w") as file:
for timer in daily_challenges:
file.write(timer.starting_timestamp().strftime(timestamp_format))
class Matchmaking:
"""Challenge other bots."""
def __init__(self, li: lichess.Lichess, config: Configuration, user_profile: USER_PROFILE_TYPE) -> None:
"""Initialize values needed for matchmaking."""
self.li = li
self.variants = list(filter(lambda variant: variant != "fromPosition", config.challenge.variants))
self.matchmaking_cfg = config.matchmaking
self.user_profile = user_profile
self.last_challenge_created_delay = Timer(25) # The challenge expires 20 seconds after creating it.
self.last_game_ended_delay = Timer(self.matchmaking_cfg.challenge_timeout * 60)
self.last_user_profile_update_time = Timer(5 * 60) # 5 minutes.
self.min_wait_time = 60 # Wait 60 seconds before creating a new challenge to avoid hitting the api rate limits.
self.challenge_id: str = ""
self.daily_challenges: DAILY_TIMERS_TYPE = read_daily_challenges()
# (opponent name, game aspect) --> other bot is likely to accept challenge
# game aspect is the one the challenged bot objects to and is one of:
# - game speed (bullet, blitz, etc.)
# - variant (standard, horde, etc.)
# - casual/rated
# - empty string (if no other reason is given or self.filter_type is COARSE)
self.challenge_type_acceptable: DefaultDict[Tuple[str, str], bool] = defaultdict(lambda: True)
self.challenge_filter = self.matchmaking_cfg.challenge_filter
for name in self.matchmaking_cfg.block_list:
self.add_to_block_list(name)
def should_create_challenge(self) -> bool:
"""Whether we should create a challenge."""
matchmaking_enabled = self.matchmaking_cfg.allow_matchmaking
time_has_passed = self.last_game_ended_delay.is_expired()
challenge_expired = self.last_challenge_created_delay.is_expired() and self.challenge_id
min_wait_time_passed = self.last_challenge_created_delay.time_since_reset() > self.min_wait_time
if challenge_expired:
self.li.cancel(self.challenge_id)
logger.info(f"Challenge id {self.challenge_id} cancelled.")
self.challenge_id = ""
self.show_earliest_challenge_time()
return bool(matchmaking_enabled and (time_has_passed or challenge_expired) and min_wait_time_passed)
def create_challenge(self, username: str, base_time: int, increment: int, days: int, variant: str,
mode: str) -> str:
"""Create a challenge."""
params = {"rated": mode == "rated", "variant": variant}
if days:
params["days"] = days
elif base_time or increment:
params["clock.limit"] = base_time
params["clock.increment"] = increment
else:
logger.error("At least one of challenge_days, challenge_initial_time, or challenge_increment "
"must be greater than zero in the matchmaking section of your config file.")
return ""
try:
self.update_daily_challenge_record()
self.last_challenge_created_delay.reset()
response = self.li.challenge(username, params)
challenge_id: str = response.get("challenge", {}).get("id", "")
if not challenge_id:
logger.error(response)
self.add_to_block_list(username)
self.show_earliest_challenge_time()
return challenge_id
except Exception as e:
logger.warning("Could not create challenge")
logger.debug(e, exc_info=e)
self.show_earliest_challenge_time()
return ""
def update_daily_challenge_record(self) -> None:
"""
Record timestamp of latest challenge and update minimum wait time.
As the number of challenges in a day increase, the minimum wait time between challenges increases.
0 - 49 challenges --> 1 minute
50 - 99 challenges --> 2 minutes
100 - 149 challenges --> 3 minutes
etc.
"""
self.daily_challenges = [timer for timer in self.daily_challenges if not timer.is_expired()]
self.daily_challenges.append(Timer(one_day_seconds))
self.min_wait_time = 60 * ((len(self.daily_challenges) // 50) + 1)
write_daily_challenges(self.daily_challenges)
def perf(self) -> Dict[str, Dict[str, Any]]:
"""Get the bot's rating in every variant. Bullet, blitz, rapid etc. are considered different variants."""
user_perf: Dict[str, Dict[str, Any]] = self.user_profile["perfs"]
return user_perf
def username(self) -> str:
"""Our username."""
username: str = self.user_profile["username"]
return username
def update_user_profile(self) -> None:
"""Update our user profile data, to get our latest rating."""
if self.last_user_profile_update_time.is_expired():
self.last_user_profile_update_time.reset()
try:
self.user_profile = self.li.get_profile()
except Exception:
pass
def choose_opponent(self) -> Tuple[Optional[str], int, int, int, str, str]:
"""Choose an opponent."""
variant = self.get_random_config_value("challenge_variant", self.variants)
mode = self.get_random_config_value("challenge_mode", ["casual", "rated"])
base_time = random.choice(self.matchmaking_cfg.challenge_initial_time)
increment = random.choice(self.matchmaking_cfg.challenge_increment)
days = random.choice(self.matchmaking_cfg.challenge_days)
play_correspondence = [bool(days), not bool(base_time or increment)]
if random.choice(play_correspondence):
base_time = 0
increment = 0
else:
days = 0
game_type = game_category(variant, base_time, increment, days)
min_rating = self.matchmaking_cfg.opponent_min_rating
max_rating = self.matchmaking_cfg.opponent_max_rating
rating_diff = self.matchmaking_cfg.opponent_rating_difference
bot_rating = self.perf().get(game_type, {}).get("rating", 0)
if rating_diff is not None and bot_rating > 0:
min_rating = bot_rating - rating_diff
max_rating = bot_rating + rating_diff
logger.info(f"Seeking {game_type} game with opponent rating in [{min_rating}, {max_rating}] ...")
allow_tos_violation = self.matchmaking_cfg.opponent_allow_tos_violation
def is_suitable_opponent(bot: USER_PROFILE_TYPE) -> bool:
perf = bot.get("perfs", {}).get(game_type, {})
return (bot["username"] != self.username()
and not self.in_block_list(bot["username"])
and not bot.get("disabled")
and (allow_tos_violation or not bot.get("tosViolation")) # Terms of Service violation.
and perf.get("games", 0) > 0
and min_rating <= perf.get("rating", 0) <= max_rating)
online_bots = self.li.get_online_bots()
online_bots = list(filter(is_suitable_opponent, online_bots))
def ready_for_challenge(bot: USER_PROFILE_TYPE) -> bool:
aspects = [variant, game_type, mode] if self.challenge_filter == FilterType.FINE else []
return all(self.should_accept_challenge(bot["username"], aspect) for aspect in aspects)
ready_bots = list(filter(ready_for_challenge, online_bots))
online_bots = ready_bots or online_bots
bot_username = None
try:
bot = random.choice(online_bots)
bot_profile = self.li.get_public_data(bot["username"])
if bot_profile.get("blocking"):
self.add_to_block_list(bot["username"])
else:
bot_username = bot["username"]
except Exception:
if online_bots:
logger.exception("Error:")
else:
logger.error("No suitable bots found to challenge.")
return bot_username, base_time, increment, days, variant, mode
def get_random_config_value(self, parameter: str, choices: List[str]) -> str:
"""Choose a random value from `choices` if the parameter value in the config is `random`."""
value: str = self.matchmaking_cfg.lookup(parameter)
return value if value != "random" else random.choice(choices)
def challenge(self, active_games: Set[str], challenge_queue: MULTIPROCESSING_LIST_TYPE) -> None:
"""
Challenge an opponent.
:param active_games: The games that the bot is playing.
:param challenge_queue: The queue containing the challenges.
"""
if active_games or challenge_queue or not self.should_create_challenge():
return
logger.info("Challenging a random bot")
self.update_user_profile()
bot_username, base_time, increment, days, variant, mode = self.choose_opponent()
logger.info(f"Will challenge {bot_username} for a {variant} game.")
challenge_id = self.create_challenge(bot_username, base_time, increment, days, variant, mode) if bot_username else ""
logger.info(f"Challenge id is {challenge_id if challenge_id else 'None'}.")
self.challenge_id = challenge_id
def game_done(self) -> None:
"""Reset the timer for when the last game ended, and prints the earliest that the next challenge will be created."""
self.last_game_ended_delay.reset()
self.show_earliest_challenge_time()
def show_earliest_challenge_time(self) -> None:
"""Show the earliest that the next challenge will be created."""
postgame_timeout = self.last_game_ended_delay.time_until_expiration()
time_to_next_challenge = self.min_wait_time - self.last_challenge_created_delay.time_since_reset()
time_left = max(postgame_timeout, time_to_next_challenge)
earliest_challenge_time = datetime.datetime.now() + datetime.timedelta(seconds=time_left)
challenges = "challenge" + ("" if len(self.daily_challenges) == 1 else "s")
logger.info(f"Next challenge will be created after {earliest_challenge_time.strftime('%X')} "
f"({len(self.daily_challenges)} {challenges} in last 24 hours)")
def add_to_block_list(self, username: str) -> None:
"""Add a bot to the blocklist."""
self.add_challenge_filter(username, "")
def in_block_list(self, username: str) -> bool:
"""Check if an opponent is in the block list to prevent future challenges."""
return not self.should_accept_challenge(username, "")
def add_challenge_filter(self, username: str, game_aspect: str) -> None:
"""
Prevent creating another challenge when an opponent has decline a challenge.
:param username: The name of the opponent.
:param game_aspect: The aspect of a game (time control, chess variant, etc.)
that caused the opponent to decline a challenge. If the parameter is empty,
that is equivalent to adding the opponent to the block list.
"""
self.challenge_type_acceptable[(username, game_aspect)] = False
def should_accept_challenge(self, username: str, game_aspect: str) -> bool:
"""
Whether a bot is likely to accept a challenge to a game.
:param username: The name of the opponent.
:param game_aspect: A category of the challenge type (time control, chess variant, etc.) to test for acceptance.
If game_aspect is empty, this is equivalent to checking if the opponent is in the block list.
"""
return self.challenge_type_acceptable[(username, game_aspect)]
def accepted_challenge(self, event: EVENT_TYPE) -> None:
"""
Set the challenge id to an empty string, if the challenge was accepted.
Otherwise, we would attempt to cancel the challenge later.
"""
if self.challenge_id == event["game"]["id"]:
self.challenge_id = ""
def declined_challenge(self, event: EVENT_TYPE) -> None:
"""
Handle a challenge that was declined by the opponent.
Depends on whether `FilterType` is `NONE`, `COARSE`, or `FINE`.
"""
challenge = model.Challenge(event["challenge"], self.user_profile)
opponent = challenge.opponent
reason = event["challenge"]["declineReason"]
logger.info(f"{opponent} declined {challenge}: {reason}")
if self.challenge_id == challenge.id:
self.challenge_id = ""
if not challenge.from_self or self.challenge_filter == FilterType.NONE:
return
mode = "rated" if challenge.rated else "casual"
decline_details: Dict[str, str] = {"generic": "",
"later": "",
"nobot": "",
"toofast": challenge.speed,
"tooslow": challenge.speed,
"timecontrol": challenge.speed,
"rated": mode,
"casual": mode,
"standard": challenge.variant,
"variant": challenge.variant}
reason_key = event["challenge"]["declineReasonKey"].lower()
if reason_key not in decline_details:
logger.warning(f"Unknown decline reason received: {reason_key}")
game_problem = decline_details.get(reason_key, "") if self.challenge_filter == FilterType.FINE else ""
self.add_challenge_filter(opponent.name, game_problem)
logger.info(f"Will not challenge {opponent} to another {game_problem}".strip() + " game.")
self.show_earliest_challenge_time()
def game_category(variant: str, base_time: int, increment: int, days: int) -> str:
"""
Get the game type (e.g. bullet, atomic, classical). Lichess has one rating for every variant regardless of time control.
:param variant: The game's variant.
:param base_time: The base time in seconds.
:param increment: The increment in seconds.
:param days: If the game is correspondence, we have some days to play the move.
:return: The game category.
"""
game_duration = base_time + increment * 40
if variant != "standard":
return variant
elif days:
return "correspondence"
elif game_duration < 179:
return "bullet"
elif game_duration < 479:
return "blitz"
elif game_duration < 1499:
return "rapid"
else:
return "classical"