Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor rpc_request method to allow use in threadsafe queue #409

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions examples/thread_safe_substrate_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2021 Vincent Texier <[email protected]>
#
# This software is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from queue import Queue
from threading import Thread

from substrateinterface import SubstrateInterface


class ThreadSafe(Thread):

queue: Queue = Queue()

def __init__(self, *args, **kwargs):
"""
Init a SubstrateInterface client adapter instance as a thread

:param args: Positional arguments
:param kwargs: Keywords arguments
"""
super().__init__(*args, **kwargs)

def run(self):
"""
Started asynchronously with Thread.start()

:return:
"""
while True:
# print("loop...")
call, method, args, result_handler, result = self.queue.get()
result_ = dict()
# print(call, method, args, result_handler, result)
if call == "--close--":
logging.debug("Close queue thread on substrate_interface")
break

try:
# logging.debug(f"threadsafe call to rpc method {method}")
result_ = call(method, args, result_handler)
except Exception as exception:
logging.error(method)
logging.error(args)
# logging.exception(exception)
result.put(exception)
# print(call.__name__, " put result ", result_)
result.put(result_)
# print("reloop...")

logging.debug("SubstrateInterface connection closed and thread terminated.")

def close(self):
"""
Close connection

:return:
"""
# Closing the connection
self.queue.put(("--close--", None, None, None, None))


class ThreadSafeSubstrateInterface(SubstrateInterface):
"""
Override substrate_interface client class with a queue to be thread safe

"""

def __init__(self, *args, **kwargs):
"""
Init a SubstrateInterface client adapter instance as a thread

:param args: Positional arguments
:param kwargs: Keywords arguments
"""
# create and start thread before calling parent init (which makes a rpc_request!)
self.thread = ThreadSafe()
self.thread.start()

super().__init__(*args, **kwargs)

def rpc_request(self, method, params, result_handler=None) -> dict:
"""
Override rpc_request method to use threadsafe queue

:param method: Name of the RPC method
:param params: Params of the RPC method
:param result_handler: Optional variable to receive results, default to None
:return:
"""
result: Queue = Queue()
self.thread.queue.put(
(super().rpc_request, method, params, result_handler, result)
)
# print(self.thread.queue.get())
# print('done calling %s' % method)
return_ = result.get()
if isinstance(return_, Exception):
raise return_
return return_

def close(self):
logging.debug("Close RPC connection thread")
self.thread.close()


if __name__ == '__main__':
# start Substrate instance as a thread...
substrate = ThreadSafeSubstrateInterface(
url="ws://127.0.0.1:9944"
)
while True:
substrate.get_block()
number = substrate.get_block_number(substrate.block_hash)
print(number, substrate.block_hash, substrate.version, substrate.runtime_version)
time.sleep(6)
160 changes: 101 additions & 59 deletions substrateinterface/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,31 @@ def rpc_request(self, method, params, result_handler=None):
-------
a dict with the parsed result of the request.
"""
if self.websocket:
json_body = self.rpc_request_websocket(method, params, result_handler)
else:
if result_handler:
raise ConfigurationError("Result handlers only available for websockets (ws://) connections")

json_body = self.rpc_request_http(method, params)

return json_body

def rpc_request_websocket(self, method, params, result_handler=None):
"""
Method that handles the actual RPC request to the Substrate node. The other implemented functions eventually
use this method to perform the request.

Parameters
----------
result_handler: Callback function that processes the result received from the node
method: method of the JSONRPC request
params: a list containing the parameters of the JSONRPC request

Returns
-------
a dict with the parsed result of the request.
"""
request_id = self.request_id
self.request_id += 1

Expand All @@ -267,83 +291,101 @@ def rpc_request(self, method, params, result_handler=None):

self.debug_message('RPC request #{}: "{}"'.format(request_id, method))

if self.websocket:
try:
self.websocket.send(json.dumps(payload))
except WebSocketConnectionClosedException:
if self.config.get('auto_reconnect') and self.url:
# Try to reconnect websocket and retry rpc_request
self.debug_message("Connection Closed; Trying to reconnecting...")
self.connect_websocket()

return self.rpc_request(method=method, params=params, result_handler=result_handler)
else:
# websocket connection is externally created, re-raise exception
raise
try:
self.websocket.send(json.dumps(payload))
except WebSocketConnectionClosedException:
if self.config.get('auto_reconnect') and self.url:
# Try to reconnect websocket and retry rpc_request
self.debug_message("Connection Closed; Trying to reconnecting...")
self.connect_websocket()

return self.rpc_request(method=method, params=params, result_handler=result_handler)
else:
# websocket connection is externally created, re-raise exception
raise

update_nr = 0
json_body = None
subscription_id = None
update_nr = 0
json_body = None
subscription_id = None

while json_body is None:
# Search for subscriptions
for message, remove_message in list_remove_iter(self.__rpc_message_queue):
while json_body is None:
# Search for subscriptions
for message, remove_message in list_remove_iter(self.__rpc_message_queue):

# Check if result message is matching request ID
if 'id' in message and message['id'] == request_id:
# Check if result message is matching request ID
if 'id' in message and message['id'] == request_id:
remove_message()

remove_message()
# Check if response has error
if 'error' in message:
raise SubstrateRequestException(message['error'])

# Check if response has error
if 'error' in message:
raise SubstrateRequestException(message['error'])
# If result handler is set, pass result through and loop until handler return value is set
if callable(result_handler):

# If result handler is set, pass result through and loop until handler return value is set
if callable(result_handler):
# Set subscription ID and only listen to messages containing this ID
subscription_id = message['result']
self.debug_message(f"Websocket subscription [{subscription_id}] created")

# Set subscription ID and only listen to messages containing this ID
subscription_id = message['result']
self.debug_message(f"Websocket subscription [{subscription_id}] created")
else:
json_body = message

else:
json_body = message
# Process subscription updates
for message, remove_message in list_remove_iter(self.__rpc_message_queue):
# Check if message is meant for this subscription
if 'params' in message and message['params']['subscription'] == subscription_id:

# Process subscription updates
for message, remove_message in list_remove_iter(self.__rpc_message_queue):
# Check if message is meant for this subscription
if 'params' in message and message['params']['subscription'] == subscription_id:
remove_message()

remove_message()
self.debug_message(f"Websocket result [{subscription_id} #{update_nr}]: {message}")

self.debug_message(f"Websocket result [{subscription_id} #{update_nr}]: {message}")
# Call result_handler with message for processing
callback_result = result_handler(message, update_nr, subscription_id)
if callback_result is not None:
json_body = callback_result

# Call result_handler with message for processing
callback_result = result_handler(message, update_nr, subscription_id)
if callback_result is not None:
json_body = callback_result
update_nr += 1

update_nr += 1
# Read one more message to queue
if json_body is None:
self.__rpc_message_queue.append(json.loads(self.websocket.recv()))

# Read one more message to queue
if json_body is None:
self.__rpc_message_queue.append(json.loads(self.websocket.recv()))
return json_body

else:
def rpc_request_http(self, method, params):
"""
Method that handles the actual RPC request to the Substrate node with HTTP.

if result_handler:
raise ConfigurationError("Result handlers only available for websockets (ws://) connections")
Parameters
----------
method: method of the JSONRPC request
params: a list containing the parameters of the JSONRPC request

Returns
-------
a dict with the parsed result of the request.
"""
request_id = self.request_id
self.request_id += 1

payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": request_id
}

response = self.session.request("POST", self.url, data=json.dumps(payload), headers=self.default_headers)
response = self.session.request("POST", self.url, data=json.dumps(payload), headers=self.default_headers)

if response.status_code != 200:
raise SubstrateRequestException(
"RPC request failed with HTTP status code {}".format(response.status_code))
if response.status_code != 200:
raise SubstrateRequestException(
"RPC request failed with HTTP status code {}".format(response.status_code))

json_body = response.json()
json_body = response.json()

# Check if response has error
if 'error' in json_body:
raise SubstrateRequestException(json_body['error'])
# Check if response has error
if 'error' in json_body:
raise SubstrateRequestException(json_body['error'])

return json_body

Expand Down Expand Up @@ -1783,14 +1825,14 @@ def result_handler(message, update_nr, subscription_id):
message_result = {k.lower(): v for k, v in message['params']['result'].items()}

if 'finalized' in message_result and wait_for_finalization:
self.rpc_request('author_unwatchExtrinsic', [subscription_id])
self.rpc_request_websocket('author_unwatchExtrinsic', [subscription_id])
return {
'block_hash': message_result['finalized'],
'extrinsic_hash': '0x{}'.format(extrinsic.extrinsic_hash.hex()),
'finalized': True
}
elif 'inblock' in message_result and wait_for_inclusion and not wait_for_finalization:
self.rpc_request('author_unwatchExtrinsic', [subscription_id])
self.rpc_request_websocket('author_unwatchExtrinsic', [subscription_id])
return {
'block_hash': message_result['inblock'],
'extrinsic_hash': '0x{}'.format(extrinsic.extrinsic_hash.hex()),
Expand Down
Loading