Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python: Fix possible missing events after SPEAK command #781

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 102 additions & 18 deletions src/api/python/speechd/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,53 @@ def set_callback(self, callback):
self._callback = callback

class _CallbackHandler(object):
"""Internal object which handles callbacks."""
"""Internal object which handles callbacks.

As we allow separate callbacks for each message, we need to record the
message ID associated with each callback. Unfortunately, we can only know
the message ID after the request that will generate events has been sent,
and we could already be getting events right away -- before we have a
chance to actually register the callback with the correct message ID.
To work around this issue, a caller will call 'begin_add_callback()' before
issuing the SSIP call that will start generating events, and, when it has
the proper message ID, call 'end_add_callback()' with that message ID and
information about the callback to actually call. This allows us to queue
all events before we actually know whether they match an interesting
message ID, and dispatch them manually if we actually received any before
the 'end_add_callback()' call. We also discard any that were not used if
there are no other add pending.

The caller will start an add request by issuing a 'begin_add_callback()'
call. A matching call to either 'cancel_add_callback()' (e.g. if an error
occurred sending the SSIP message to watch for) or 'end_add_callback()'
MUST be issued afterward.

Typical use will look like so:
handler.begin_add_callback()
try:
connection.send_command('SPEAK')
result = connection.send_data(data)
msg_id = int(result[2][0])
except:
handler.cancel_add_callback()
raise
else:
handler.end_add_callback(msg_id, callback, event_types)
"""

def __init__(self, client_id):
self._client_id = client_id
self._callbacks = {}
self._lock = threading.Lock()
self._pending_adds = 0
self._pending_events = {}

def _handle_event(self, callback, event_types, type, kwargs):
""" Calls 'callback(type, **kwargs)' if event_types matches 'type'.
Returns whether that event was the final event of its sequence. """
if event_types is None or type in event_types:
callback(type, **kwargs)
return type in (CallbackType.END, CallbackType.CANCEL)

def __call__(self, msg_id, client_id, type, **kwargs):
if client_id != self._client_id:
Expand All @@ -401,21 +442,58 @@ def __call__(self, msg_id, client_id, type, **kwargs):
try:
callback, event_types = self._callbacks[msg_id]
except KeyError:
pass
# if we don't have a handler for that message but we have
# pending requests, queue the message for possible later use
if self._pending_adds:
if msg_id not in self._pending_events:
self._pending_events[msg_id] = []
self._pending_events[msg_id].append((type, kwargs))
else:
if event_types is None or type in event_types:
callback(type, **kwargs)
if type in (CallbackType.END, CallbackType.CANCEL):
if self._handle_event(callback, event_types, type, kwargs):
del self._callbacks[msg_id]
finally:
self._lock.release()

def add_callback(self, msg_id, callback, event_types):
self._lock.acquire()
try:
self._callbacks[msg_id] = (callback, event_types)
finally:
self._lock.release()
def begin_add_callback(self):
with self._lock:
self._pending_adds += 1

def _add_request_finished(self):
""" cleans up when an add request has been dealt with.
@warning 'self._lock' MUST be held by the caller. """
assert self._pending_adds > 0
self._pending_adds -= 1

# if there are no more pending callbacks, drop all possibly saved
# pending events, as they were actually not interesting
if self._pending_adds <= 0:
self._pending_events = {}

def cancel_add_callback(self):
with self._lock:
self._add_request_finished()

def end_add_callback(self, msg_id, callback, event_types):
with self._lock:
# dispatch any pending events that occurred before the callback
# actually got added (i.e. between the begin_add_callback() call
# and here). Note that this means the callbacks will be run from
# the current thread, but given the usual restrictions it has it's
# likely to be OK.
skip_registration = False
try:
if msg_id in self._pending_events:
for type, kwargs in self._pending_events[msg_id]:
if self._handle_event(callback, event_types, type, kwargs):
skip_registration = True
del self._pending_events[msg_id]
finally:
self._add_request_finished()

# if not all events have already been seen, register the callback
# normally for pending events
if not skip_registration:
self._callbacks[msg_id] = (callback, event_types)

class Scope(object):
"""An enumeration of valid SSIP command scopes.
Expand Down Expand Up @@ -745,14 +823,20 @@ def speak(self, text, callback=None, event_types=None):
message is queued on the server and the method returns immediately.

"""
self._conn.send_command('SPEAK')
result = self._conn.send_data(text)
if callback:
msg_id = int(result[2][0])
# TODO: Here we risk, that the callback arrives earlier, than we
# add the item to `self._callback_handler'. Such a situation will
# lead to the callback being ignored.
self._callback_handler.add_callback(msg_id, callback, event_types)
self._callback_handler.begin_add_callback()
try:
self._conn.send_command('SPEAK')
result = self._conn.send_data(text)
if callback:
msg_id = int(result[2][0])
except:
if callback:
self._callback_handler.cancel_add_callback()
raise
else:
if callback:
self._callback_handler.end_add_callback(msg_id, callback, event_types)
Comment on lines +827 to +839
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit convoluted and I would have loved providing a simpler API, maybe like so (100% untested):

    def add_callback(self, callback=None, event_types=None):
        """
        with handler.add_callback(callback, event_types) as request:
            connection.send_command('SPEAK')
            result = connection.send_data(data)
            request.set_message_id(int(result[2][0]))
        """
        class AddCallbackRequest:
            def __init__(self, handler, callback=None, event_types=None):
                self._handler = handler
                self._msg_id = None
                self._callback = callback
                self._event_types = event_types

            def set_message_id(self, msg_id):
                self._msg_id = msg_id

            def __enter__(self):
                self.handler.begin_add_callback()
                return self

            def __exit__(self, exc_type, exc_value, traceback):
                if exc_type:
                    self.handler.cancel_add_callback()
                else:
                    assert self._msg_id is not None
                    self.handler.end_add_callback(self._msg_id, self._callback, self._event_types)
                return False

        return AddCallbackRequest(self, callback, event_types)

but unfortunately as this (sole) user has a conditional callback it's not really usable. Meh.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be fine with something like this:

        if callback:
            with handler.add_callback(callback, event_types) as request:
                self._conn.send_command('SPEAK')
                result = self._conn.send_data(text)
                request.set_message_id(int(result[2][0]))
        else:
            self._conn.send_command('SPEAK')
            result = self._conn.send_data(text)

return result

def char(self, char):
Expand Down