Skip to content

Commit

Permalink
1.4.0 subscriber no more blocks thread
Browse files Browse the repository at this point in the history
  • Loading branch information
PaTara43 committed Dec 26, 2022
1 parent 437bd02 commit b2a27e1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 43 deletions.
18 changes: 12 additions & 6 deletions docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ There are as well dedicated methods for convenient usage of RWS.
Subscriptions
+++++++++++++

There is a subscriptions functional implemented. When initiated, blocks thread and processes new events with a user-passed
callback function. Pay attention that this callback may only accept one argument - the event data. Up to now, the only supported
events are ``NewRecord``, ``NewLaunch``, ``Transfer``, ``TopicChanged`` and ``NewDevices``.
There is a subscriptions functional implemented. When initiated, processes new events with a user-passed
callback function. Pay attention that this callback may only accept one argument - the event data. Up to now, the supported
events are ``NewRecord``, ``NewLaunch``, ``Transfer``, ``TopicChanged``, ``NewDevices``, ``NewLiability`` and ``NewReport``.

.. code-block:: python
Expand All @@ -284,17 +284,23 @@ events are ``NewRecord``, ``NewLaunch``, ``Transfer``, ``TopicChanged`` and ``Ne
print(data)
account = Account()
subscriber = Subscriber(account, SubEvent.NewRecord, subscription_handler=callback)
subscriber = Subscriber(account, SubEvent.MultiEvent, subscription_handler=callback)
<do stuff>
subscriber.cancel()
One may also pass a list of addresses or one address as a parameter to filter trigger situation. Another option is to set
``pass_event_id`` to get block number and event ID as a second ``callback`` parameter.

One may also pass a list of addresses or one address as a parameter to filter trigger situations.
There is a way to subscribe to multiple events by using side package ``aenum``.

.. code-block:: python
from aenum import extend_enum
extend_enum(SubEvent, "MultiEvent", f"{SubEvent.NewRecord.value, SubEvent.NewLaunch.value}")
subscriber = Subscriber(acc, SubEvent.MultiEvent, subscription_handler=callback)
subscriber = Subscriber(acc, SubEvent.MultiEvent, subscription_handler=callback, addr=<ss58_addr>, pass_event_id=True)
IO
++
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "robonomics-interface"
version = "1.3.7"
version = "1.4.0"
description = "Robonomics wrapper over https://github.com/polkascan/py-substrate-interface created to facilitate programming with Robonomics"
authors = ["Pavel Tarasov <[email protected]>"]
license = "Apache-2.0"
Expand Down Expand Up @@ -46,7 +46,7 @@ robonomics_interface = 'robonomicsinterface.robonomics_interface_io:cli'

[tool.poetry.dependencies]
python = ">=3.8, <4.0"
substrate-interface = ">=1.3.2, <2.0"
substrate-interface = ">=1.4.0, <2.0"
click = "^8.0.4"

[tool.poetry.dev-dependencies]
Expand Down
98 changes: 63 additions & 35 deletions robonomicsinterface/classes/subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import threading
import typing as tp
from functools import partial

from enum import Enum
from functools import partial
from logging import getLogger
from websocket import WebSocketConnectionClosedException

Expand All @@ -12,6 +13,12 @@


class SubEvent(Enum):
"""
This is an ``Enum`` class to hold possible events traced by ``Subscriber`` class. May be extended with
``aenum.extend_enum``.
"""

NewRecord = "NewRecord"
NewLaunch = "NewLaunch"
Transfer = "Transfer"
Expand All @@ -23,7 +30,7 @@ class SubEvent(Enum):

class Subscriber:
"""
Class intended for use in cases when needed to subscribe on chainstate updates/events. **Blocks current thread**!
Class intended for use in cases when needed to subscribe on chainstate updates/events.
"""

def __init__(
Expand All @@ -38,26 +45,32 @@ def __init__(
Initiates an instance for further use and starts a subscription for a selected action.
:param account: Account dataclass with ``seed``, ``remote_ws`` address and node ``type_registry``.
:param subscribed_event: Event in substrate chain to be awaited. Choose from [``NewRecord``, ``NewLaunch``,
``Transfer``, ``TopicChanged``, ``NewDevices``, ``NewLiability``, ``NewReport``]. This parameter should be a
``SubEvent`` class attribute. This also requires importing this class.
:param subscription_handler: Callback function that processes the updates of the storage.
THIS FUNCTION IS MEANT TO ACCEPT ONLY ONE ARGUMENT (THE NEW EVENT DESCRIPTION TUPLE) by default.
It will receive event ID as a second parameter if ``pass_event_id`` is True.
:param subscribed_event: Event in substrate chain to be awaited. Choose from SubEvent class.
:param subscription_handler: Callback function that processes the updates of the storage. This function is meant
to accept only one parameter by default (the new event description). It will receive
``(block_num, event_id)`` as a second parameter if ``pass_event_id`` is set to ``True``.
:param pass_event_id: The ``subscription_handler`` will receive event ID as a second parameter
if ``pass_event_id`` is True. Format is ``{block_number}-{event_idx}``.
:param addr: ss58 type 32 address(-es) of an account(-s) which is(are) meant to be event target. If ``None``,
will subscribe to all such events never-mind target address(-es).
"""

self._custom_functions: ServiceFunctions = ServiceFunctions(account)
self._event: SubEvent = subscribed_event
self._callback: callable = subscription_handler
self._target_address: tp.Optional[tp.Union[tp.List[str], str]] = addr
if "(" in subscribed_event.value:
self._subscribed_event: list = (
subscribed_event.value.replace("(", "").replace(")", "").replace("'", "").split(", ")
)
else:
self._subscribed_event: list = [subscribed_event.value]
self._subscription_handler: callable = subscription_handler
self._pass_event_id: bool = pass_event_id
self._addr: tp.Optional[tp.Union[tp.List[str], str]] = addr

self._custom_functions: ServiceFunctions = ServiceFunctions(account)
self._cancel_flag: bool = False

self._subscribe_event()
self._subscription: threading.Thread = threading.Thread(target=self._subscribe_event)
self._subscription.start()

def _subscribe_event(self) -> None:
"""
Expand All @@ -66,13 +79,13 @@ def _subscribe_event(self) -> None:
"""

logger.info(f"Subscribing to event {self._event.value} for target addresses {self._target_address}")
logger.info(f"Subscribing to event {self._subscribed_event} for target addresses {self._addr}")
try:
self._custom_functions.subscribe_block_headers(self._event_callback)
except WebSocketConnectionClosedException:
self._subscribe_event()

def _event_callback(self, index_obj: tp.Any, update_nr: int, subscription_id: int) -> None:
def _event_callback(self, index_obj: tp.Any, update_nr: int, subscription_id: int) -> tp.Optional[bool]:
"""
Function, processing updates in event list storage. On update filters events to a desired account
and passes the event description to the user-provided ``callback`` method.
Expand All @@ -85,28 +98,43 @@ def _event_callback(self, index_obj: tp.Any, update_nr: int, subscription_id: in

if update_nr == 0:
return None
if self._cancel_flag:
return True

chain_events: list = self._custom_functions.chainstate_query("System", "Events")
for events in chain_events:
if events["event_id"] in self._event.value:
should_callback: bool = (self._target_address is None) or (
events["event"]["attributes"][
0
if self._event
in [
SubEvent.NewRecord,
SubEvent.TopicChanged,
SubEvent.NewDevices,
]
else 1
]
in self._target_address
)
if not should_callback:
for event in chain_events:

if event["event_id"] in self._subscribed_event:
if self._addr and not self._target_address_in_event(event):
continue
callback = partial(self._callback, events["event"]["attributes"])

callback = partial(self._subscription_handler, event["attributes"])
if self._pass_event_id:
block_num: int = index_obj["header"]["number"]
event_id: str = "{}-{}".format(block_num, events["extrinsic_idx"])
callback = partial(callback, event_id)
callback = partial(callback, f"{index_obj['header']['number']}-{event['extrinsic_idx']}")
callback()

def _target_address_in_event(self, event) -> bool:
"""
Return whether call callback function or not.
:param event: Occurred chain event.
:return: Whether call callback function or not.
"""

if isinstance(event["attributes"], dict):
event["attributes"] = list(event["attributes"].values())

if event["event_id"] in [SubEvent.NewRecord.value, SubEvent.TopicChanged.value, SubEvent.NewDevices.value]:
return str(event["attributes"][0]) in self._addr
else:
return str(event["attributes"][1]) in self._addr

def cancel(self) -> None:
"""
Cancel subscription and join its thread.
"""

self._cancel_flag = True

0 comments on commit b2a27e1

Please sign in to comment.