Skip to content

Commit

Permalink
Refactor rpc_request method to allow the use of the library with a th…
Browse files Browse the repository at this point in the history
…readsafe queue

Allow all calls to rpc_request() to be queued and be threadsafe.
Redirect the recursive call from the result_handler to the websocket sub-function.
Add thread_safe_substrate_interface.py as an example.
  • Loading branch information
vtexier committed Jan 8, 2025
1 parent 57b665a commit d808016
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 59 deletions.
128 changes: 128 additions & 0 deletions examples/thread_safe_substrate_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2021 Vincent Texier <[email protected]>
#
# This software is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from queue import Queue
from threading import Thread

from substrateinterface import SubstrateInterface


class ThreadSafe(Thread):

queue: Queue = Queue()

def __init__(self, *args, **kwargs):
"""
Init a SubstrateInterface client adapter instance as a thread
:param args: Positional arguments
:param kwargs: Keywords arguments
"""
super().__init__(*args, **kwargs)

def run(self):
"""
Started asynchronously with Thread.start()
:return:
"""
while True:
# print("loop...")
call, method, args, result_handler, result = self.queue.get()
result_ = dict()
# print(call, method, args, result_handler, result)
if call == "--close--":
logging.debug("Close queue thread on substrate_interface")
break

try:
# logging.debug(f"threadsafe call to rpc method {method}")
result_ = call(method, args, result_handler)
except Exception as exception:
logging.error(method)
logging.error(args)
# logging.exception(exception)
result.put(exception)
# print(call.__name__, " put result ", result_)
result.put(result_)
# print("reloop...")

logging.debug("SubstrateInterface connection closed and thread terminated.")

def close(self):
"""
Close connection
:return:
"""
# Closing the connection
self.queue.put(("--close--", None, None, None, None))


class ThreadSafeSubstrateInterface(SubstrateInterface):
"""
Override substrate_interface client class with a queue to be thread safe
"""

def __init__(self, *args, **kwargs):
"""
Init a SubstrateInterface client adapter instance as a thread
:param args: Positional arguments
:param kwargs: Keywords arguments
"""
# create and start thread before calling parent init (which makes a rpc_request!)
self.thread = ThreadSafe()
self.thread.start()

super().__init__(*args, **kwargs)

def rpc_request(self, method, params, result_handler=None) -> dict:
"""
Override rpc_request method to use threadsafe queue
:param method: Name of the RPC method
:param params: Params of the RPC method
:param result_handler: Optional variable to receive results, default to None
:return:
"""
result: Queue = Queue()
self.thread.queue.put(
(super().rpc_request, method, params, result_handler, result)
)
# print(self.thread.queue.get())
# print('done calling %s' % method)
return_ = result.get()
if isinstance(return_, Exception):
raise return_
return return_

def close(self):
logging.debug("Close RPC connection thread")
self.thread.close()


if __name__ == '__main__':
# start Substrate instance as a thread...
substrate = ThreadSafeSubstrateInterface(
url="ws://127.0.0.1:9944"
)
while True:
substrate.get_block()
number = substrate.get_block_number(substrate.block_hash)
print(number, substrate.block_hash, substrate.version, substrate.runtime_version)
time.sleep(6)
160 changes: 101 additions & 59 deletions substrateinterface/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,31 @@ def rpc_request(self, method, params, result_handler=None):
-------
a dict with the parsed result of the request.
"""
if self.websocket:
json_body = self.rpc_request_websocket(method, params, result_handler)
else:
if result_handler:
raise ConfigurationError("Result handlers only available for websockets (ws://) connections")

json_body = self.rpc_request_http(method, params)

return json_body

def rpc_request_websocket(self, method, params, result_handler=None):
"""
Method that handles the actual RPC request to the Substrate node. The other implemented functions eventually
use this method to perform the request.
Parameters
----------
result_handler: Callback function that processes the result received from the node
method: method of the JSONRPC request
params: a list containing the parameters of the JSONRPC request
Returns
-------
a dict with the parsed result of the request.
"""
request_id = self.request_id
self.request_id += 1

Expand All @@ -267,83 +291,101 @@ def rpc_request(self, method, params, result_handler=None):

self.debug_message('RPC request #{}: "{}"'.format(request_id, method))

if self.websocket:
try:
self.websocket.send(json.dumps(payload))
except WebSocketConnectionClosedException:
if self.config.get('auto_reconnect') and self.url:
# Try to reconnect websocket and retry rpc_request
self.debug_message("Connection Closed; Trying to reconnecting...")
self.connect_websocket()

return self.rpc_request(method=method, params=params, result_handler=result_handler)
else:
# websocket connection is externally created, re-raise exception
raise
try:
self.websocket.send(json.dumps(payload))
except WebSocketConnectionClosedException:
if self.config.get('auto_reconnect') and self.url:
# Try to reconnect websocket and retry rpc_request
self.debug_message("Connection Closed; Trying to reconnecting...")
self.connect_websocket()

return self.rpc_request(method=method, params=params, result_handler=result_handler)
else:
# websocket connection is externally created, re-raise exception
raise

update_nr = 0
json_body = None
subscription_id = None
update_nr = 0
json_body = None
subscription_id = None

while json_body is None:
# Search for subscriptions
for message, remove_message in list_remove_iter(self.__rpc_message_queue):
while json_body is None:
# Search for subscriptions
for message, remove_message in list_remove_iter(self.__rpc_message_queue):

# Check if result message is matching request ID
if 'id' in message and message['id'] == request_id:
# Check if result message is matching request ID
if 'id' in message and message['id'] == request_id:
remove_message()

remove_message()
# Check if response has error
if 'error' in message:
raise SubstrateRequestException(message['error'])

# Check if response has error
if 'error' in message:
raise SubstrateRequestException(message['error'])
# If result handler is set, pass result through and loop until handler return value is set
if callable(result_handler):

# If result handler is set, pass result through and loop until handler return value is set
if callable(result_handler):
# Set subscription ID and only listen to messages containing this ID
subscription_id = message['result']
self.debug_message(f"Websocket subscription [{subscription_id}] created")

# Set subscription ID and only listen to messages containing this ID
subscription_id = message['result']
self.debug_message(f"Websocket subscription [{subscription_id}] created")
else:
json_body = message

else:
json_body = message
# Process subscription updates
for message, remove_message in list_remove_iter(self.__rpc_message_queue):
# Check if message is meant for this subscription
if 'params' in message and message['params']['subscription'] == subscription_id:

# Process subscription updates
for message, remove_message in list_remove_iter(self.__rpc_message_queue):
# Check if message is meant for this subscription
if 'params' in message and message['params']['subscription'] == subscription_id:
remove_message()

remove_message()
self.debug_message(f"Websocket result [{subscription_id} #{update_nr}]: {message}")

self.debug_message(f"Websocket result [{subscription_id} #{update_nr}]: {message}")
# Call result_handler with message for processing
callback_result = result_handler(message, update_nr, subscription_id)
if callback_result is not None:
json_body = callback_result

# Call result_handler with message for processing
callback_result = result_handler(message, update_nr, subscription_id)
if callback_result is not None:
json_body = callback_result
update_nr += 1

update_nr += 1
# Read one more message to queue
if json_body is None:
self.__rpc_message_queue.append(json.loads(self.websocket.recv()))

# Read one more message to queue
if json_body is None:
self.__rpc_message_queue.append(json.loads(self.websocket.recv()))
return json_body

else:
def rpc_request_http(self, method, params):
"""
Method that handles the actual RPC request to the Substrate node with HTTP.
if result_handler:
raise ConfigurationError("Result handlers only available for websockets (ws://) connections")
Parameters
----------
method: method of the JSONRPC request
params: a list containing the parameters of the JSONRPC request
Returns
-------
a dict with the parsed result of the request.
"""
request_id = self.request_id
self.request_id += 1

payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": request_id
}

response = self.session.request("POST", self.url, data=json.dumps(payload), headers=self.default_headers)
response = self.session.request("POST", self.url, data=json.dumps(payload), headers=self.default_headers)

if response.status_code != 200:
raise SubstrateRequestException(
"RPC request failed with HTTP status code {}".format(response.status_code))
if response.status_code != 200:
raise SubstrateRequestException(
"RPC request failed with HTTP status code {}".format(response.status_code))

json_body = response.json()
json_body = response.json()

# Check if response has error
if 'error' in json_body:
raise SubstrateRequestException(json_body['error'])
# Check if response has error
if 'error' in json_body:
raise SubstrateRequestException(json_body['error'])

return json_body

Expand Down Expand Up @@ -1783,14 +1825,14 @@ def result_handler(message, update_nr, subscription_id):
message_result = {k.lower(): v for k, v in message['params']['result'].items()}

if 'finalized' in message_result and wait_for_finalization:
self.rpc_request('author_unwatchExtrinsic', [subscription_id])
self.rpc_request_websocket('author_unwatchExtrinsic', [subscription_id])
return {
'block_hash': message_result['finalized'],
'extrinsic_hash': '0x{}'.format(extrinsic.extrinsic_hash.hex()),
'finalized': True
}
elif 'inblock' in message_result and wait_for_inclusion and not wait_for_finalization:
self.rpc_request('author_unwatchExtrinsic', [subscription_id])
self.rpc_request_websocket('author_unwatchExtrinsic', [subscription_id])
return {
'block_hash': message_result['inblock'],
'extrinsic_hash': '0x{}'.format(extrinsic.extrinsic_hash.hex()),
Expand Down

0 comments on commit d808016

Please sign in to comment.