Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make event ordering more deterministic #34

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Expand youtube.py error information
- Handle 'a' vs 'an' in drinks plugin
- Apply rate limiting to regex hooks
- Ensure event order is deterministic
- Make event queueing happen non-async
### Fixed
- Ensure event order is deterministic
- Fix matching exception in horoscope test
- Fix youtube.py ISO time parse
- Fix grammatical error in food sentence (beer)
Expand All @@ -29,12 +32,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fix FML random URL
- Update tvdb.py to v3 TVDB API
- Fix channel parameter handling in IRC client
- Ensure hooks are triggered according to priority
- chan_track: Ensure hooks acquire the needed locks
### Removed
- twitch.py removed due to outdated API and lack of maintainer
- metacritic.py removed due to broken scraper and lack of maintainer
- amazon.py removed due to broken scraper and no maintainer
- newegg.py removed due to broken scraper and no maintainer
- Removed path patching in main module
- Remove unused run_before events/tasks

## [1.3.0] 2020-03-17
### Added
Expand Down
24 changes: 10 additions & 14 deletions cloudbot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,27 +307,22 @@ def load_clients(self):
scanner = Scanner(bot=self)
scanner.scan(clients, categories=["cloudbot.client"])

async def process(self, event):
def process(self, event):
"""
:type event: Event
"""
run_before_tasks = []
tasks = []
halted = False

def add_hook(hook, _event, _run_before=False):
def add_hook(hook, _event):
nonlocal halted
if halted:
return False

if hook.clients and _event.conn.type not in hook.clients:
return True

coro = self.plugin_manager.launch(hook, _event)
if _run_before:
run_before_tasks.append(coro)
else:
tasks.append(coro)
tasks.append((hook, _event))

if hook.action is Action.HALTALL:
halted = True
Expand All @@ -340,10 +335,8 @@ def add_hook(hook, _event, _run_before=False):

# Raw IRC hook
for raw_hook in self.plugin_manager.catch_all_triggers:
# run catch-all coroutine hooks before all others - TODO: Make this a plugin argument
run_before = not raw_hook.threaded
if not add_hook(
raw_hook, Event(hook=raw_hook, base_event=event), _run_before=run_before
raw_hook, Event(hook=raw_hook, base_event=event)
):
# The hook has an action of Action.HALT* so stop adding new tasks
break
Expand Down Expand Up @@ -423,6 +416,9 @@ def add_hook(hook, _event, _run_before=False):
# The hook has an action of Action.HALT* so stop adding new tasks
break

# Run the tasks
await asyncio.gather(*run_before_tasks, loop=self.loop)
await asyncio.gather(*tasks, loop=self.loop)
tasks.sort(key=lambda t: t[0].priority)

for _hook, _event in tasks:
async_util.wrap_future(
self.plugin_manager.launch(_hook, _event), loop=self.loop
)
4 changes: 2 additions & 2 deletions cloudbot/clients/irc.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ def data_received(self, data):
self.conn.describe_server(),
)
else:
# handle the message, async
async_util.wrap_future(self.bot.process(event), loop=self.loop)
# handle the message
self.bot.process(event)

def parse_line(self, line: str) -> Event:
message = Message.parse(line)
Expand Down
41 changes: 22 additions & 19 deletions plugins/core/chan_track.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Requires:
server_info.py
"""
import asyncio
import gc
import json
import logging
Expand All @@ -28,6 +29,8 @@

logger = logging.getLogger("cloudbot")

data_lock = asyncio.Lock()


class MemberNotFoundException(KeyError):
def __init__(self, name, chan):
Expand Down Expand Up @@ -513,7 +516,7 @@ def replace_user_data(conn, chan_data):
del chan_data.users[old_nick]


@hook.irc_raw(["353", "366"], singlethread=True, do_sieve=False)
@hook.irc_raw(['353', '366'], singlethread=True, lock=data_lock, do_sieve=False)
def on_names(conn, irc_paramlist, irc_command):
"""
:type conn: cloudbot.client.Client
Expand Down Expand Up @@ -583,7 +586,7 @@ def serialize(self, mapping, **kwargs):
return json.dumps(self._serialize(mapping), **kwargs)


@hook.permission("chanop")
@hook.permission("chanop", lock=data_lock, do_sieve=False)
def perm_check(chan, conn, nick):
"""
:type chan: str
Expand Down Expand Up @@ -685,7 +688,7 @@ def handle_tags(conn: IrcClient, nick: str, irc_tags: TagList) -> None:
user_data.account = account_tag.value


@hook.irc_raw(["PRIVMSG", "NOTICE"], do_sieve=False)
@hook.irc_raw(["PRIVMSG", "NOTICE"], lock=data_lock, do_sieve=False)
def on_msg(conn, nick, user, host, irc_paramlist):
chan = irc_paramlist[0]

Expand All @@ -709,7 +712,7 @@ def on_msg(conn, nick, user, host, irc_paramlist):
memb.data["last_privmsg"] = time.time()


@hook.periodic(600)
@hook.periodic(600, lock=data_lock, do_sieve=False)
def clean_pms(bot):
cutoff = time.time() - 600
for conn in bot.connections.values():
Expand All @@ -726,7 +729,7 @@ def clean_pms(bot):
pass


@hook.irc_raw("JOIN", do_sieve=False)
@hook.irc_raw("JOIN", lock=data_lock, do_sieve=False)
def on_join(nick, user, host, conn, irc_paramlist):
"""
:type nick: str
Expand All @@ -753,7 +756,7 @@ def on_join(nick, user, host, conn, irc_paramlist):
user_data.join_channel(chan_data)


@hook.irc_raw("MODE", do_sieve=False)
@hook.irc_raw('MODE', lock=data_lock, do_sieve=False)
def on_mode(chan, irc_paramlist, conn):
"""
:type chan: str
Expand Down Expand Up @@ -791,7 +794,7 @@ def on_mode(chan, irc_paramlist, conn):
member.sort_status()


@hook.irc_raw("PART", do_sieve=False)
@hook.irc_raw('PART', lock=data_lock, do_sieve=False)
def on_part(chan, nick, conn):
"""
:type chan: str
Expand All @@ -806,7 +809,7 @@ def on_part(chan, nick, conn):
del chan_data.users[nick]


@hook.irc_raw("KICK", do_sieve=False)
@hook.irc_raw('KICK', lock=data_lock, do_sieve=False)
def on_kick(chan, target, conn):
"""
:type chan: str
Expand All @@ -816,7 +819,7 @@ def on_kick(chan, target, conn):
on_part(chan, target, conn)


@hook.irc_raw("QUIT", do_sieve=False)
@hook.irc_raw('QUIT', lock=data_lock, do_sieve=False)
def on_quit(nick, conn):
"""
:type nick: str
Expand All @@ -830,7 +833,7 @@ def on_quit(nick, conn):
del chan.users[nick]


@hook.irc_raw("NICK", do_sieve=False)
@hook.irc_raw('NICK', lock=data_lock, do_sieve=False)
def on_nick(nick, irc_paramlist, conn):
"""
:type nick: str
Expand All @@ -857,7 +860,7 @@ def on_nick(nick, irc_paramlist, conn):
user_chans[new_nick] = user_chans.pop(nick)


@hook.irc_raw("ACCOUNT", do_sieve=False)
@hook.irc_raw('ACCOUNT', lock=data_lock, do_sieve=False)
def on_account(conn, nick, irc_paramlist):
"""
:type nick: str
Expand All @@ -867,7 +870,7 @@ def on_account(conn, nick, irc_paramlist):
get_users(conn).getuser(nick).account = irc_paramlist[0]


@hook.irc_raw("CHGHOST", do_sieve=False)
@hook.irc_raw('CHGHOST', lock=data_lock, do_sieve=False)
def on_chghost(conn, nick, irc_paramlist):
"""
:type nick: str
Expand All @@ -880,7 +883,7 @@ def on_chghost(conn, nick, irc_paramlist):
user.host = host


@hook.irc_raw("AWAY", do_sieve=False)
@hook.irc_raw('AWAY', lock=data_lock, do_sieve=False)
def on_away(conn, nick, irc_paramlist):
"""
:type nick: str
Expand All @@ -897,7 +900,7 @@ def on_away(conn, nick, irc_paramlist):
user.away_message = reason


@hook.irc_raw("352", do_sieve=False)
@hook.irc_raw('352', lock=data_lock, do_sieve=False)
def on_who(conn, irc_paramlist):
"""
:type irc_paramlist: cloudbot.util.parsers.irc.ParamList
Expand All @@ -917,7 +920,7 @@ def on_who(conn, irc_paramlist):
user.is_oper = is_oper


@hook.irc_raw("311", do_sieve=False)
@hook.irc_raw('311', lock=data_lock, do_sieve=False)
def on_whois_name(conn, irc_paramlist):
"""
:type irc_paramlist: cloudbot.util.parsers.irc.ParamList
Expand All @@ -930,7 +933,7 @@ def on_whois_name(conn, irc_paramlist):
user.realname = realname


@hook.irc_raw("330", do_sieve=False)
@hook.irc_raw('330', lock=data_lock, do_sieve=False)
def on_whois_acct(conn, irc_paramlist):
"""
:type irc_paramlist: cloudbot.util.parsers.irc.ParamList
Expand All @@ -940,7 +943,7 @@ def on_whois_acct(conn, irc_paramlist):
get_users(conn).getuser(nick).account = acct


@hook.irc_raw("301", do_sieve=False)
@hook.irc_raw('301', lock=data_lock, do_sieve=False)
def on_whois_away(conn, irc_paramlist):
"""
:type irc_paramlist: cloudbot.util.parsers.irc.ParamList
Expand All @@ -952,7 +955,7 @@ def on_whois_away(conn, irc_paramlist):
user.away_message = msg


@hook.irc_raw("312", do_sieve=False)
@hook.irc_raw('312', lock=data_lock, do_sieve=False)
def on_whois_server(conn, irc_paramlist):
"""
:type irc_paramlist: cloudbot.util.parsers.irc.ParamList
Expand All @@ -962,7 +965,7 @@ def on_whois_server(conn, irc_paramlist):
get_users(conn).getuser(nick).server = server


@hook.irc_raw("313", do_sieve=False)
@hook.irc_raw('313', lock=data_lock, do_sieve=False)
def on_whois_oper(conn, irc_paramlist):
"""
:type irc_paramlist: cloudbot.util.parsers.irc.ParamList
Expand Down
6 changes: 4 additions & 2 deletions tests/core_tests/irc_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def wait_tasks(self, conn, cancel=False):
task.cancel()

try:
conn.loop.run_until_complete(asyncio.gather(*tasks))
conn.loop.run_until_complete(asyncio.gather(*tasks, loop=conn.loop))
except CancelledError:
if not cancel:
raise # pragma: no cover
Expand All @@ -61,7 +61,7 @@ def make_proto(self):
conn.loop = asyncio.get_event_loop_policy().new_event_loop()
out = []

async def func(e):
def func(e):
out.append(self._filter_event(e))

conn.bot.process = func
Expand Down Expand Up @@ -133,6 +133,8 @@ def test_broken_line_doesnt_interrupt(self, caplog):

self.wait_tasks(conn)

assert len(out) == 2

assert out == [
{
"chan": "server\x02.host",
Expand Down