Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python: Fix possible missing events after SPEAK command #781

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

cwendling
Copy link
Contributor

@cwendling cwendling commented Oct 11, 2022

This fixes the very old (before early 2007 at least) TODO not to miss events in the Python client implementation if they come in too quickly for having set up the listener properly.

As I didn't see a trivial fix short of predicting the message ID (is that possible? that would be a good solution :)) -- and I'd wager there probably isn't if it's gone known and unfixed for so long -- I implemented a more complex one.
What I do here is start queuing events before actually knowing the message ID, and when I do finally know it, dispatch any possibly caught event, and then attach the normal callback. This requires care in the caller side, but as it's an internal API only used in one place it's likely fine.

There is one "big" user-facing change (apart from not missing events): collected events are dispatched directly from within speak(), and not from the communication thread. I would think this is likely OK because of the restrictions the current callbacks already have, but in theory it could break a caller that holds a lock when calling speak() and use that same lock also in the callback (that would deadlock).


As for testing, I didn't see any Python API tests yet so I didn't really know what to do, but I verified both with Orca and with a dummy client like so:

say.py

import speechd
import sys
from gi.repository import GLib

client = speechd.SSIPClient(sys.argv[0], component='default')
client.set_data_mode(speechd.DataMode.SSML)

loop = GLib.MainLoop()
got_start = False

def cb(cb_type, index_mark=None):
    global got_start
    print("IDLE: got event of type %s with mark %s" % (cb_type, index_mark))
    if cb_type == speechd.CallbackType.BEGIN:
        got_start = True
    elif not got_start:
        print("ERROR: got %s before %s!" % (cb_type, speechd.CallbackType.BEGIN))
        loop.quit()
    if cb_type == speechd.CallbackType.END:
        loop.quit()

client.speak('\n'.join(sys.argv[1:]), callback=cb, event_types=[
	speechd.CallbackType.BEGIN,
	speechd.CallbackType.CANCEL,
	speechd.CallbackType.END,
	speechd.CallbackType.INDEX_MARK
])

loop.run()

client.close()

sys.exit(0 if got_start == True else 1)

input.ssml

Yeah, this is an excerpt from an Orca debug log, you guessed it ;)

<speak><mark name="0:7"/>Lecteur <mark name="8:15"/>d’écran <mark name="16:23"/>activé.</speak>
<speak><mark name="0:9"/>Badminton <mark name="10:13"/>aux <mark name="14:18"/>Jeux <mark name="19:29"/>olympiques <mark name="30:31"/>— <mark name="32:41"/>Wikipédia <mark name="42:43"/>- <mark name="44:51"/>Mozilla <mark name="52:59"/>Firefox <mark name="60:66"/>cadre.</speak>
<speak><mark name="0:7"/>contenu <mark name="8:17"/>principal</speak>
<speak><mark name="0:2"/>Ce <mark name="3:9"/>format <mark name="10:11"/>a <mark name="12:15"/>été <mark name="16:24"/>conservé <mark name="25:32"/>depuis. </speak>
<speak><mark name="0:10"/>finalistes <mark name="11:19"/>perdants <mark name="20:24"/>afin <mark name="25:27"/>de <mark name="28:38"/>déterminer <mark name="39:41"/>un <mark name="42:46"/>seul </speak>
<speak><mark name="0:8"/>médaillé <mark name="9:11"/>de <mark name="12:18"/>bronze <mark name="19:24"/>lien.</speak>
<speak><mark name="0:1"/>. </speak>
<speak><mark name="0:2"/>la <mark name="3:12"/>troisième <mark name="13:18"/>place <mark name="19:22"/>fut <mark name="23:32"/>également <mark name="33:40"/>rajouté <mark name="41:46"/>entre <mark name="47:50"/>les <mark name="51:55"/>deux <mark name="56:61"/>demi-</speak>
<speak><mark name="0:6"/>double <mark name="7:12"/>mixte <mark name="13:18"/>porta <mark name="19:21"/>le <mark name="22:28"/>nombre <mark name="29:39"/>d'épreuves <mark name="40:41"/>à <mark name="42:46"/>cinq <mark name="47:49"/>et <mark name="50:52"/>un <mark name="53:58"/>match <mark name="59:63"/>pour </speak>
<speak><mark name="0:11"/>finaliste). <mark name="12:16"/>Lors <mark name="17:19"/>de <mark name="20:31"/>l'olympiade <mark name="32:41"/>suivante, </speak>
<speak><mark name="0:7"/>Atlanta <mark name="8:13"/>lien.</speak>
<speak><mark name="0:4"/>1996 <mark name="5:10"/>lien.</speak>
<speak><mark name="0:1"/>, <mark name="2:9"/>l'ajout <mark name="10:12"/>du </speak>
<speak><mark name="0:6"/>chaque <mark name="7:15"/>épreuve, <mark name="16:24"/>incluant <mark name="25:29"/>deux <mark name="30:39"/>médailles <mark name="40:42"/>de <mark name="43:49"/>bronze <mark name="50:54"/>(une <mark name="55:58"/>par <mark name="59:64"/>demi-</speak>
<speak><mark name="0:6"/>hommes <mark name="7:9"/>et <mark name="10:16"/>double <mark name="17:23"/>dames. <mark name="24:30"/>Quatre <mark name="31:40"/>médailles <mark name="41:47"/>furent <mark name="48:58"/>attribuées <mark name="59:63"/>dans </speak>
<speak><mark name="0:7"/>étaient <mark name="8:18"/>organisées <mark name="19:20"/>: <mark name="21:27"/>simple <mark name="28:35"/>hommes, <mark name="36:42"/>simple <mark name="43:49"/>dames, <mark name="50:56"/>double </speak>
<speak><mark name="0:4"/>plus </speak>
<speak><mark name="0:2"/>Le <mark name="3:12"/>badminton <mark name="13:16"/>fut <mark name="17:24"/>présent <mark name="25:29"/>pour <mark name="30:32"/>la <mark name="33:41"/>première <mark name="42:46"/>fois <mark name="47:49"/>en <mark name="50:54"/>tant <mark name="55:58"/>que <mark name="59:64"/>sport <mark name="65:73"/>officiel <mark name="74:77"/>aux <mark name="78:82"/>Jeux <mark name="83:93"/>olympiques <mark name="94:99"/>d'été <mark name="100:102"/>de </speak>
<speak><mark name="0:9"/>Barcelone</speak>
<speak><mark name="0:4"/>lien</speak>
<speak><mark name="0:1"/>. </speak>
<speak> <mark name="1:7"/>Quatre <mark name="8:16"/>épreuves <mark name="17:18"/>y <mark name="19:26"/>étaient <mark name="27:37"/>organisées <mark name="38:39"/>: <mark name="40:46"/>simple <mark name="47:54"/>hommes, <mark name="55:61"/>simple <mark name="62:68"/>dames, <mark name="69:75"/>double <mark name="76:82"/>hommes <mark name="83:85"/>et <mark name="86:92"/>double <mark name="93:99"/>dames. </speak>
<speak> <mark name="1:7"/>Quatre <mark name="8:17"/>médailles <mark name="18:24"/>furent <mark name="25:35"/>attribuées <mark name="36:40"/>dans <mark name="41:47"/>chaque <mark name="48:56"/>épreuve, <mark name="57:65"/>incluant <mark name="66:70"/>deux <mark name="71:80"/>médailles <mark name="81:83"/>de <mark name="84:90"/>bronze <mark name="91:95"/>(une <mark name="96:99"/>par <mark name="100:116"/>demi-finaliste). </speak>
<speak> <mark name="1:5"/>Lors <mark name="6:8"/>de <mark name="9:20"/>l'olympiade <mark name="21:30"/>suivante, </speak>
<speak><mark name="0:7"/>Atlanta</speak>
<speak><mark name="0:4"/>lien</speak>
<speak>espace</speak>
<speak><mark name="0:4"/>1996</speak>
<speak><mark name="0:4"/>lien</speak>
<speak><mark name="0:1"/>, <mark name="2:9"/>l'ajout <mark name="10:12"/>du <mark name="13:19"/>double <mark name="20:25"/>mixte <mark name="26:31"/>porta <mark name="32:34"/>le <mark name="35:41"/>nombre <mark name="42:52"/>d'épreuves <mark name="53:54"/>à <mark name="55:59"/>cinq <mark name="60:62"/>et <mark name="63:65"/>un <mark name="66:71"/>match <mark name="72:76"/>pour <mark name="77:79"/>la <mark name="80:89"/>troisième <mark name="90:95"/>place <mark name="96:99"/>fut <mark name="100:109"/>également <mark name="110:117"/>rajouté <mark name="118:123"/>entre <mark name="124:127"/>les <mark name="128:132"/>deux <mark name="133:148"/>demi-finalistes <mark name="149:157"/>perdants <mark name="158:162"/>afin <mark name="163:165"/>de <mark name="166:176"/>déterminer <mark name="177:179"/>un <mark name="180:184"/>seul </speak>
<speak><mark name="0:8"/>médaillé <mark name="9:11"/>de <mark name="12:18"/>bronze</speak>
<speak><mark name="0:4"/>lien</speak>
<speak><mark name="0:7"/>Lecteur <mark name="8:15"/>d’écran <mark name="16:26"/>désactivé.</speak>

Calling this

while read -r line; do echo "Speaking: $line"; python3 say.py "$line" || break; done < input.ssml

Note that somehow I reproduce this a lot more easily in 0.10 than 0.11, no real idea why. With 0.10 it basically always fails on the second sentence (I'd guess because by then spd is already set up and running, so snappier?), whereas on 0.11 it's a lot more random, and often works with or without it.

Comment on lines +827 to +839
self._callback_handler.begin_add_callback()
try:
self._conn.send_command('SPEAK')
result = self._conn.send_data(text)
if callback:
msg_id = int(result[2][0])
except:
if callback:
self._callback_handler.cancel_add_callback()
raise
else:
if callback:
self._callback_handler.end_add_callback(msg_id, callback, event_types)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit convoluted and I would have loved providing a simpler API, maybe like so (100% untested):

    def add_callback(self, callback=None, event_types=None):
        """
        with handler.add_callback(callback, event_types) as request:
            connection.send_command('SPEAK')
            result = connection.send_data(data)
            request.set_message_id(int(result[2][0]))
        """
        class AddCallbackRequest:
            def __init__(self, handler, callback=None, event_types=None):
                self._handler = handler
                self._msg_id = None
                self._callback = callback
                self._event_types = event_types

            def set_message_id(self, msg_id):
                self._msg_id = msg_id

            def __enter__(self):
                self.handler.begin_add_callback()
                return self

            def __exit__(self, exc_type, exc_value, traceback):
                if exc_type:
                    self.handler.cancel_add_callback()
                else:
                    assert self._msg_id is not None
                    self.handler.end_add_callback(self._msg_id, self._callback, self._event_types)
                return False

        return AddCallbackRequest(self, callback, event_types)

but unfortunately as this (sole) user has a conditional callback it's not really usable. Meh.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be fine with something like this:

        if callback:
            with handler.add_callback(callback, event_types) as request:
                self._conn.send_command('SPEAK')
                result = self._conn.send_data(text)
                request.set_message_id(int(result[2][0]))
        else:
            self._conn.send_command('SPEAK')
            result = self._conn.send_data(text)

@sthibaul
Copy link
Collaborator

I'd wager there probably isn't if it's gone known and unfixed for so long

Probably just nobody had a look and it wasn't hurting people enough for anybody to take up the task.

collected events are dispatched directly from within speak(), and not from the communication thread

I'm quite afraid of this. There is the deadlock issue indeed, but also events could then be handled mixed up, if some events come while end_add_callback eventually processes old events. That can produce really odd results.

I'm thinking that we may just not need the _CallbackHandler class at all. It's there to fix concurrency of several threads adding various callbacks, but if we do this all from the communication thread all threading issues disappear.

I'd then much rather see the communication thread always do the processing, which I believe is not that complex: on begin_add_callback, we can make _communication pile up callbacks. When _communication notices the response to the command, it can add the callback with the resulting msg_id, and unlock the processing of the pile of callbacks.

This means we're moving the concurrency management from _CallbackHandler into begin_add_callback, to be managed by the communication thread alone, but that looks much more robust to me.

@cwendling
Copy link
Contributor Author

but also events could then be handled mixed up, if some events come while end_add_callback eventually processes old events. That can produce really odd results.

I'm not quite sure how could this happen? _CallbackHandler.__call__() holds the lock, so dos end_add_callback(), so this should still be sequential, shouldn't it?
The only thing that I see could currently mix up ordering is if we can get several callbacks for one message ID and callbacks for other messages IDs interleaved on the wire. If so, indeed __call__() would dispatch the callback for the known ID although it's holding others. If this is possible and undesirable, it'd be easy enough to just hold the known IDs off as well so long as we have pending adds.

I'm thinking that we may just not need the _CallbackHandler class at all. It's there to fix concurrency of several threads adding various callbacks, but if we do this all from the communication thread all threading issues disappear.

I'd then much rather see the communication thread always do the processing, which I believe is not that complex: on begin_add_callback, we can make _communication pile up callbacks. When _communication notices the response to the command, it can add the callback with the resulting msg_id, and unlock the processing of the pile of callbacks.

This means we're moving the concurrency management from _CallbackHandler into begin_add_callback, to be managed by the communication thread alone, but that looks much more robust to me.

Well, this is not trivial, because if we do not want to delay callbacks too much, we need not to wait for the next message to be received before releasing possibly held off callbacks. And this means making the whole _communication thread wait on two different things: the message IO one one side, and a callback semaphore (or alike) on the other. This can be done, but then requires to perform non-blocking IOs and whatnot, making the communication thread a whole lot more complex.

Another option which would make the code simpler (yet sound a tad crazy) would be to have an additional thread only responsible of managing callbacks, decoupling IO and callbacks. I'm not sure this is sound, so I'd rather have your opinion here.

Or maybe I'm completely missing something?

@sthibaul
Copy link
Collaborator

but also events could then be handled mixed up, if some events come while end_add_callback eventually processes old events. That can produce really odd results.
I'm not quite sure how could this happen? _CallbackHandler.call() holds the lock, so dos end_add_callback(), so this should still be sequential, shouldn't it?

The execution will be synchronized, yes, but can happen in whatever order depending on the locking order, I don't think we want that, and rather see callbacks always called in the event order.

if we do not want to delay callbacks too much,

Do we actually care? The command is non-blocking, so we'll only have the round-trip with the server, which should be fast enough.

would be to have an additional thread only responsible of managing callbacks, decoupling IO and callbacks

There is a saying that adding a thread most often only adds more headaches than solves things :)

The problem is that quite often we do need to have proper ordering between requests from the application and events, so that events don't get handled before/after requests that have some interactions with them. Having a single loop that processes everything makes it way simpler to achieve it.

@cwendling
Copy link
Contributor Author

The execution will be synchronized, yes, but can happen in whatever order depending on the locking order, I don't think we want that, and rather see callbacks always called in the event order.

Well, if we hold off all callbacks while an addition is in the works, that wouldn't happen: either __call__() gets called first and queues its event, or end_add_callback() gets first and dispatches its events before __call__() can acquire the lock.
I agree, currently __call__() can dispatch known callbacks even if there are mending additions, which could, in theory at least, mix them up. But if we queue everything when an addition is in progress, there's no such issue.

diff --git a/src/api/python/speechd/client.py b/src/api/python/speechd/client.py
index 3743af87..97ab1ff4 100644
--- a/src/api/python/speechd/client.py
+++ b/src/api/python/speechd/client.py
@@ -439,18 +439,19 @@ class _CallbackHandler(object):
             return
         self._lock.acquire()
         try:
-            try:
-                callback, event_types = self._callbacks[msg_id]
-            except KeyError:
-                # if we don't have a handler for that message but we have
-                # pending requests, queue the message for possible later use
-                if self._pending_adds:
-                    if msg_id not in self._pending_events:
-                        self._pending_events[msg_id] = []
-                    self._pending_events[msg_id].append((type, kwargs))
+            # if we have pending requests, queue the message for possible later use
+            if self._pending_adds:
+                if msg_id not in self._pending_events:
+                    self._pending_events[msg_id] = []
+                self._pending_events[msg_id].append((type, kwargs))
             else:
-                if self._handle_event(callback, event_types, type, kwargs):
-                    del self._callbacks[msg_id]
+                try:
+                    callback, event_types = self._callbacks[msg_id]
+                except KeyError:
+                    pass
+                else:
+                    if self._handle_event(callback, event_types, type, kwargs):
+                        del self._callbacks[msg_id]
         finally:
             self._lock.release()
 

Do we actually care? The command is non-blocking, so we'll only have the round-trip with the server, which should be fast enough.

But what if all events have been received by the time the caller issues end_add_callback()? We'd need to dispatch them "manually". I mean, imagine this:

        self._handler.being_add_callback()
        # here we start piling up callbacks...
        self._conn.send_command('SPEAK')
        result = self._conn.send_data(text)
        time.sleep(60)  # or any arbitrary delay happening in this thread
        self._handler.end_add_callback(int(result[2][0]), callback, event_types)
        # and now, how are the queued callbacks dispatched?
        # point is, everything is piled up and we won't get any more activity in
        # the communication thread until we issue another command

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants