From 1a23902748341b144632829b2df35e5d6e6a5e74 Mon Sep 17 00:00:00 2001 From: JarbasAI <33701864+JarbasAl@users.noreply.github.com> Date: Thu, 28 Dec 2023 18:13:02 +0000 Subject: [PATCH] Update messagebus.py --- ovos_utils/messagebus.py | 494 --------------------------------------- 1 file changed, 494 deletions(-) diff --git a/ovos_utils/messagebus.py b/ovos_utils/messagebus.py index d62f97a4..7195a62f 100644 --- a/ovos_utils/messagebus.py +++ b/ovos_utils/messagebus.py @@ -526,497 +526,3 @@ def decode_binary_message(message): binary_data = message.data["binary"] # decode hex string return bytearray.fromhex(binary_data) - - -def to_alnum(skill_id): - """Convert a skill id to only alphanumeric characters - - Non alpha-numeric characters are converted to "_" - - Args: - skill_id (str): identifier to be converted - Returns: - (str) String of letters - """ - return ''.join(c if c.isalnum() else '_' for c in str(skill_id)) - - -def unmunge_message(message, skill_id): - """Restore message keywords by removing the Letterified skill ID. - Args: - message (FakeMessage): Intent result message - skill_id (str): skill identifier - Returns: - Message without clear keywords - """ - if hasattr(message, "data") and isinstance(message.data, dict): - skill_id = to_alnum(skill_id) - for key in list(message.data.keys()): - if key.startswith(skill_id): - # replace the munged key with the real one - new_key = key[len(skill_id):] - message.data[new_key] = message.data.pop(key) - - return message - - -def get_handler_name(handler): - """Name (including class if available) of handler function. - - Args: - handler (function): Function to be named - - Returns: - string: handler name as string - """ - if '__self__' in dir(handler) and 'name' in dir(handler.__self__): - return handler.__self__.name + '.' + handler.__name__ - else: - return handler.__name__ - - -def create_wrapper(handler, skill_id, on_start, on_end, on_error): - """Create the default skill handler wrapper. - - This wrapper handles things like metrics, reporting handler start/stop - and errors. - handler (callable): method/function to call - skill_id: skill_id for associated skill - on_start (function): function to call before executing the handler - on_end (function): function to call after executing the handler - on_error (function): function to call for error reporting - """ - - def wrapper(message): - stopwatch = Stopwatch() - try: - message = unmunge_message(message, skill_id) - if on_start: - on_start(message) - - with stopwatch: - if len(signature(handler).parameters) == 0: - handler() - else: - handler(message) - - except Exception as e: - if on_error: - if len(signature(on_error).parameters) == 2: - on_error(e, message) - else: - on_error(e) - finally: - if on_end: - on_end(message) - - # Send timing metrics - context = message.context - if context and 'ident' in context: - try: - from mycroft.metrics import report_timing - report_timing(context['ident'], 'skill_handler', stopwatch, - {'handler': handler.__name__, - 'skill_id': skill_id}) - except ImportError: - pass - - return wrapper - - -def create_basic_wrapper(handler, on_error=None): - """Create the default skill handler wrapper. - - This wrapper handles things like metrics, reporting handler start/stop - and errors. - - Args: - handler (callable): method/function to call - on_error (function): function to call to report error. - - Returns: - Wrapped callable - """ - - def wrapper(message): - try: - if len(signature(handler).parameters) == 0: - handler() - else: - handler(message) - except Exception as e: - if on_error: - on_error(e) - - return wrapper - - -class EventContainer: - def __init__(self, bus=None): - log_deprecation("Import `ovos_utils.events.EventContainer", "0.1.0") - self.bus = bus - self.events = [] - - def set_bus(self, bus): - self.bus = bus - - def add(self, name, handler, once=False): - """Create event handler for executing intent or other event. - Args: - name (string): IntentParser name - handler (func): Method to call - once (bool, optional): Event handler will be removed after it has - been run once. - """ - - def once_wrapper(message): - # Remove registered one-time handler before invoking, - # allowing them to re-schedule themselves. - self.remove(name) - handler(message) - - if handler: - if once: - self.bus.once(name, once_wrapper) - self.events.append((name, once_wrapper)) - else: - self.bus.on(name, handler) - self.events.append((name, handler)) - - LOG.debug('Added event: {}'.format(name)) - - def remove(self, name): - """Removes an event from bus emitter and events list. - Args: - name (string): Name of Intent or Scheduler Event - Returns: - bool: True if found and removed, False if not found - """ - LOG.debug("Removing event {}".format(name)) - removed = False - for _name, _handler in list(self.events): - if name == _name: - try: - self.events.remove((_name, _handler)) - except ValueError: - LOG.error('Failed to remove event {}'.format(name)) - pass - removed = True - - # Because of function wrappers, the emitter doesn't always directly - # hold the _handler function, it sometimes holds something like - # 'wrapper(_handler)'. So a call like: - # self.bus.remove(_name, _handler) - # will not find it, leaving an event handler with that name left behind - # waiting to fire if it is ever re-installed and triggered. - # Remove all handlers with the given name, regardless of handler. - if removed: - self.bus.remove_all_listeners(name) - return removed - - def __iter__(self): - return iter(self.events) - - def clear(self): - """Unregister all registered handlers and clear the list of registered - events. - """ - for e, f in self.events: - self.bus.remove(e, f) - self.events = [] # Remove reference to wrappers - -class BusService: - """ - Provide some service over the messagebus for other components - - response = Message("face.recognition.reply") - service = BusService(response) - service.listen("face.recognition") - - while True: - data = do_computation() - service.update_response(data) # replaces response.data - - """ - - def __init__(self, message, trigger_messages=None, bus=None): - self.bus = bus or get_mycroft_bus() - self.response = message - trigger_messages = trigger_messages or [] - self.events = [] - for message_type in trigger_messages: - self.listen(message_type) - - def listen(self, message_type, callback=None): - if callback is None: - callback = self._respond - self.bus.on(message_type, callback) - self.events.append((message_type, callback)) - - def update_response(self, data=None): - if data is not None: - self.response.data = data - - def _respond(self, message): - self.bus.emit(message.reply(self.response.type, self.response.data)) - - def shutdown(self): - """ remove all listeners """ - for event, callback in self.events: - self.bus.remove(event, callback) - - -class BusFeedProvider: - """ - - Meant to be subclassed - - - class ClockService(BusFeedProvider): - def __init__(self, name="clock_transmitter", bus=None): - trigger_message = Message("time.request") - super().__init__(trigger_message, name, bus) - self.set_data_gatherer(self.handle_get_time) - - def handle_get_time(self, message): - self.update({"date": datetime.now()}) - - - clock_service = ClockService() - - """ - - def __init__(self, trigger_message, name=None, bus=None, config=None): - """ - initialize responder - - args: - name(str): name identifier for .conf settings - bus (WebsocketClient): mycroft messagebus websocket - """ - if not config: - log_deprecation(f"Expected a dict config and got None.", "0.1.0") - try: - from ovos_config.config import read_mycroft_config - config = read_mycroft_config() - except ImportError: - LOG.warning("ovos_config not available. Falling back to " - "default configuration") - config = dict() - self.trigger_message = trigger_message - self.name = name or self.__class__.__name__ - self.bus = bus or get_mycroft_bus() - self.callback = None - self.service = None - self._daemon = None - self.config = config.get(self.name, {}) - - def update(self, data): - """ - change the data of the response to be sent when queried - """ - if self.service is not None: - self.service.update_response(data) - - def set_data_gatherer(self, callback, default_data=None, daemonic=False, interval=90): - """ - prepare responder for sending, register answers - """ - - self.bus.remove_all_listeners(self.trigger_message) - if ".request" in self.trigger_message: - response_type = self.trigger_message.replace(".request", ".reply") - else: - response_type = self.trigger_message + ".reply" - - response = FakeMessage(response_type, default_data) - self.service = BusService(response, bus=self.bus) - self.callback = callback - self.bus.on(self.trigger_message, self._respond) - if daemonic: - self._daemon = create_loop(self._data_daemon, interval) - - def _data_daemon(self): - if self.callback is not None: - self.callback(self.trigger_message) - - def _respond(self, message): - """ - gather data and emit to bus - """ - try: - if self.callback: - self.callback(message) - except Exception as e: - LOG.error(e) - self.service.respond(message) - - def shutdown(self): - self.bus.remove_all_listeners(self.trigger_message) - if self._daemon: - self._daemon.join(0) - self._daemon = None - if self.service: - self.service.shutdown() - self.service = None - - -class BusQuery: - """ - retrieve data from some other component over the messagebus at any time - - message = Message("request.msg", {...}, {...}) - query = BusQuery(message) - response = query.send() - # do some more stuff - response = query.send() # reutilize the object - - """ - - def __init__(self, message, bus=None): - self.bus = bus or get_mycroft_bus() - self._waiting = False - self.response = FakeMessage(None, None, None) - self.query = message - self.valid_response_types = [] - - def add_response_type(self, response_type): - """ listen to a new response_type """ - if response_type not in self.valid_response_types: - self.valid_response_types.append(response_type) - self.bus.on(response_type, self._end_wait) - - def _end_wait(self, message): - self.response = message - self._waiting = False - - def _wait_response(self, timeout): - start = time.time() - elapsed = 0 - self._waiting = True - while self._waiting and elapsed < timeout: - elapsed = time.time() - start - time.sleep(0.1) - self._waiting = False - - def send(self, response_type=None, timeout=10): - self.response = FakeMessage(None, None, None) - if response_type is None: - response_type = self.query.type + ".reply" - self.add_response_type(response_type) - self.bus.emit(self.query) - self._wait_response(timeout) - return self.response - - def remove_listeners(self): - for event in self.valid_response_types: - self.bus.remove(event, self._end_wait) - - def shutdown(self): - """ remove all listeners """ - self.remove_listeners() - - -class BusFeedConsumer: - """ - this is meant to be subclassed - - class Clock(BusFeedConsumer): - def __init__(self, name="clock_receiver", timeout=3, bus=None): - request_message = Message("time.request") - super().__init__(request_message, name, timeout, bus) - - # blocking - clock = Clock() - date = clock.request()["date"] - - # async - clock = Clock(timeout=0) # non - blocking - clock.request(daemonic=True, # loop on background - frequency=1) # update result every second - - date = clock.result["date"] - - """ - - def __init__(self, query_message, name=None, timeout=5, bus=None, - config=None): - self.query_message = query_message - self.query_message.context["source"] = self.name - self.name = name or self.__class__.__name__ - self.bus = bus or get_mycroft_bus() - if not config: - log_deprecation(f"Expected a dict config and got None.", "0.1.0") - try: - from ovos_config.config import read_mycroft_config - config = read_mycroft_config() - except ImportError: - LOG.warning("Config not provided and ovos_config not available") - config = dict() - self.config = config.get(self.name, {}) - self.timeout = timeout - self.query = None - self.valid_responses = [] - self._daemon = None - - def request(self, response_messages=None, daemonic=False, interval=90): - """ - prepare query for sending, add several possible kinds of - response message automatically - "message_type.reply" , - "message_type.response", - "message_type.result" - """ - response_messages = response_messages or [] - - # generate valid reply message types - self.valid_responses = response_messages - if ".request" in self.query_message.type: - response = self.query_message.type.replace(".request", ".reply") - if response not in self.valid_responses: - self.valid_responses.append(response) - response = self.query_message.type.replace(".request", ".response") - if response not in self.valid_responses: - self.valid_responses.append(response) - response = self.query_message.type.replace(".request", ".result") - if response not in self.valid_responses: - self.valid_responses.append(response) - else: - response = self.query_message.type + ".reply" - if response not in self.valid_responses: - self.valid_responses.append(response) - response = self.query_message.type + ".response" - if response not in self.valid_responses: - self.valid_responses.append(response) - response = self.query_message.type + ".result" - if response not in self.valid_responses: - self.valid_responses.append(response) - - # update message context - self.query_message.context["valid_responses"] = self.valid_responses - - self._query() - if daemonic: - self._daemon = create_loop(self._request_daemon, interval) - return self.result - - def _request_daemon(self): - self.query.send(self.valid_responses[0], self.timeout) - - def _query(self): - self.query = BusQuery(self.query_message) - for message in self.valid_responses[1:]: - self.query.add_response_type(message) - self.query.send(self.valid_responses[0], self.timeout) - - @property - def result(self): - return self.query.response.data - - def shutdown(self): - """ remove all listeners """ - if self._daemon: - self._daemon.join(0) - self._daemon = None - if self.query: - self.query.shutdown()