diff --git a/docs/source/usage.rst b/docs/source/usage.rst index 019b887..8e8f31d 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -272,9 +272,9 @@ There are as well dedicated methods for convenient usage of RWS. Subscriptions +++++++++++++ -There is a subscriptions functional implemented. When initiated, blocks thread and processes new events with a user-passed -callback function. Pay attention that this callback may only accept one argument - the event data. Up to now, the only supported -events are ``NewRecord``, ``NewLaunch``, ``Transfer``, ``TopicChanged`` and ``NewDevices``. +There is a subscriptions functional implemented. When initiated, processes new events with a user-passed +callback function. Pay attention that this callback may only accept one argument - the event data. Up to now, the supported +events are ``NewRecord``, ``NewLaunch``, ``Transfer``, ``TopicChanged``, ``NewDevices``, ``NewLiability`` and ``NewReport``. .. code-block:: python @@ -284,9 +284,15 @@ events are ``NewRecord``, ``NewLaunch``, ``Transfer``, ``TopicChanged`` and ``Ne print(data) account = Account() - subscriber = Subscriber(account, SubEvent.NewRecord, subscription_handler=callback) + subscriber = Subscriber(account, SubEvent.MultiEvent, subscription_handler=callback) + + + + subscriber.cancel() + +One may also pass a list of addresses or one address as a parameter to filter trigger situation. Another option is to set +``pass_event_id`` to get block number and event ID as a second ``callback`` parameter. -One may also pass a list of addresses or one address as a parameter to filter trigger situations. There is a way to subscribe to multiple events by using side package ``aenum``. .. code-block:: python @@ -294,7 +300,7 @@ There is a way to subscribe to multiple events by using side package ``aenum``. from aenum import extend_enum extend_enum(SubEvent, "MultiEvent", f"{SubEvent.NewRecord.value, SubEvent.NewLaunch.value}") - subscriber = Subscriber(acc, SubEvent.MultiEvent, subscription_handler=callback) + subscriber = Subscriber(acc, SubEvent.MultiEvent, subscription_handler=callback, addr=, pass_event_id=True) IO ++ diff --git a/pyproject.toml b/pyproject.toml index dc7bfbb..3c849e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "robonomics-interface" -version = "1.3.6" +version = "1.4.0" description = "Robonomics wrapper over https://github.com/polkascan/py-substrate-interface created to facilitate programming with Robonomics" authors = ["Pavel Tarasov "] license = "Apache-2.0" @@ -46,7 +46,7 @@ robonomics_interface = 'robonomicsinterface.robonomics_interface_io:cli' [tool.poetry.dependencies] python = ">=3.8, <4.0" -substrate-interface = ">=1.3.2, <2.0" +substrate-interface = ">=1.4.0, <2.0" click = "^8.0.4" [tool.poetry.dev-dependencies] diff --git a/robonomicsinterface/classes/chain_utils.py b/robonomicsinterface/classes/chain_utils.py index 4f533de..ee43b1d 100644 --- a/robonomicsinterface/classes/chain_utils.py +++ b/robonomicsinterface/classes/chain_utils.py @@ -48,6 +48,7 @@ def get_block_number(self, block_hash: str) -> int: return self.interface.get_block_number(block_hash) + @check_socket_opened def get_block_hash(self, block_number: int) -> str: """ Get block hash by its number. diff --git a/robonomicsinterface/classes/subscriptions.py b/robonomicsinterface/classes/subscriptions.py index 93455a6..02cbb7e 100644 --- a/robonomicsinterface/classes/subscriptions.py +++ b/robonomicsinterface/classes/subscriptions.py @@ -1,7 +1,8 @@ +import threading import typing as tp -from functools import partial from enum import Enum +from functools import partial from logging import getLogger from websocket import WebSocketConnectionClosedException @@ -12,6 +13,12 @@ class SubEvent(Enum): + """ + This is an ``Enum`` class to hold possible events traced by ``Subscriber`` class. May be extended with + ``aenum.extend_enum``. + + """ + NewRecord = "NewRecord" NewLaunch = "NewLaunch" Transfer = "Transfer" @@ -23,7 +30,7 @@ class SubEvent(Enum): class Subscriber: """ - Class intended for use in cases when needed to subscribe on chainstate updates/events. **Blocks current thread**! + Class intended for use in cases when needed to subscribe on chainstate updates/events. """ def __init__( @@ -38,12 +45,10 @@ def __init__( Initiates an instance for further use and starts a subscription for a selected action. :param account: Account dataclass with ``seed``, ``remote_ws`` address and node ``type_registry``. - :param subscribed_event: Event in substrate chain to be awaited. Choose from [``NewRecord``, ``NewLaunch``, - ``Transfer``, ``TopicChanged``, ``NewDevices``, ``NewLiability``, ``NewReport``]. This parameter should be a - ``SubEvent`` class attribute. This also requires importing this class. - :param subscription_handler: Callback function that processes the updates of the storage. - THIS FUNCTION IS MEANT TO ACCEPT ONLY ONE ARGUMENT (THE NEW EVENT DESCRIPTION TUPLE) by default. - It will receive event ID as a second parameter if ``pass_event_id`` is True. + :param subscribed_event: Event in substrate chain to be awaited. Choose from SubEvent class. + :param subscription_handler: Callback function that processes the updates of the storage. This function is meant + to accept only one parameter by default (the new event description). It will receive + ``(block_num, event_id)`` as a second parameter if ``pass_event_id`` is set to ``True``. :param pass_event_id: The ``subscription_handler`` will receive event ID as a second parameter if ``pass_event_id`` is True. Format is ``{block_number}-{event_idx}``. :param addr: ss58 type 32 address(-es) of an account(-s) which is(are) meant to be event target. If ``None``, @@ -51,13 +56,21 @@ def __init__( """ - self._custom_functions: ServiceFunctions = ServiceFunctions(account) - self._event: SubEvent = subscribed_event - self._callback: callable = subscription_handler - self._target_address: tp.Optional[tp.Union[tp.List[str], str]] = addr + if "(" in subscribed_event.value: + self._subscribed_event: list = ( + subscribed_event.value.replace("(", "").replace(")", "").replace("'", "").split(", ") + ) + else: + self._subscribed_event: list = [subscribed_event.value] + self._subscription_handler: callable = subscription_handler self._pass_event_id: bool = pass_event_id + self._addr: tp.Optional[tp.Union[tp.List[str], str]] = addr + + self._custom_functions: ServiceFunctions = ServiceFunctions(account) + self._cancel_flag: bool = False - self._subscribe_event() + self._subscription: threading.Thread = threading.Thread(target=self._subscribe_event) + self._subscription.start() def _subscribe_event(self) -> None: """ @@ -66,13 +79,13 @@ def _subscribe_event(self) -> None: """ - logger.info(f"Subscribing to event {self._event.value} for target addresses {self._target_address}") + logger.info(f"Subscribing to event {self._subscribed_event} for target addresses {self._addr}") try: self._custom_functions.subscribe_block_headers(self._event_callback) except WebSocketConnectionClosedException: self._subscribe_event() - def _event_callback(self, index_obj: tp.Any, update_nr: int, subscription_id: int) -> None: + def _event_callback(self, index_obj: tp.Any, update_nr: int, subscription_id: int) -> tp.Optional[bool]: """ Function, processing updates in event list storage. On update filters events to a desired account and passes the event description to the user-provided ``callback`` method. @@ -85,28 +98,43 @@ def _event_callback(self, index_obj: tp.Any, update_nr: int, subscription_id: in if update_nr == 0: return None + if self._cancel_flag: + return True chain_events: list = self._custom_functions.chainstate_query("System", "Events") - for events in chain_events: - if events["event_id"] in self._event.value: - should_callback: bool = (self._target_address is None) or ( - events["event"]["attributes"][ - 0 - if self._event - in [ - SubEvent.NewRecord, - SubEvent.TopicChanged, - SubEvent.NewDevices, - ] - else 1 - ] - in self._target_address - ) - if not should_callback: + for event in chain_events: + + if event["event_id"] in self._subscribed_event: + if self._addr and not self._target_address_in_event(event): continue - callback = partial(self._callback, events["event"]["attributes"]) + + callback = partial(self._subscription_handler, event["attributes"]) if self._pass_event_id: - block_num: int = index_obj["header"]["number"] - event_id: str = "{}-{}".format(block_num, events["extrinsic_idx"]) - callback = partial(callback, event_id) + callback = partial(callback, f"{index_obj['header']['number']}-{event['extrinsic_idx']}") callback() + + def _target_address_in_event(self, event) -> bool: + """ + Return whether call callback function or not. + + :param event: Occurred chain event. + + :return: Whether call callback function or not. + + """ + + if isinstance(event["attributes"], dict): + event["attributes"] = list(event["attributes"].values()) + + if event["event_id"] in [SubEvent.NewRecord.value, SubEvent.TopicChanged.value, SubEvent.NewDevices.value]: + return str(event["attributes"][0]) in self._addr + else: + return str(event["attributes"][1]) in self._addr + + def cancel(self) -> None: + """ + Cancel subscription and join its thread. + + """ + + self._cancel_flag = True