Skip to content

Commit

Permalink
add option to allow exception propagation (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
ecpeterson authored Mar 27, 2019
1 parent ef64614 commit ccc42a6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 21 deletions.
2 changes: 1 addition & 1 deletion VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.4.0
2.4.1
75 changes: 61 additions & 14 deletions rpcq/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,26 @@ class Server:
"""
Server that accepts JSON RPC calls through a socket.
"""
def __init__(self, rpc_spec: RPCSpec = None, announce_timing: bool = False):
def __init__(self, rpc_spec: RPCSpec = None, announce_timing: bool = False,
serialize_exceptions: bool = True):
"""
Create a server that will be linked to a socket
:param rpc_spec: JSON RPC spec
:param announce_timing:
:param serialize_exceptions: If set to True, this Server will catch all exceptions occurring
internally to it and, when possible, communicate them to the interrogating Client. If
set to False, this Server will re-raise any exceptions it encounters (including, but not
limited to, those which might occur through method calls to rpc_spec) for Server's
local owner to handle.
IMPORTANT NOTE: When set to False, this *almost definitely* means an unrecoverable
crash, and the Server should then be _shutdown().
"""
self.announce_timing = announce_timing
self.serialize_exceptions = serialize_exceptions

self.rpc_spec = rpc_spec if rpc_spec else RPCSpec()
self.rpc_spec = rpc_spec if rpc_spec else RPCSpec(serialize_exceptions=serialize_exceptions)
self._exit_handlers = []

self._socket = None
Expand Down Expand Up @@ -74,17 +85,49 @@ async def run_async(self, endpoint: str):
"""
self._connect(endpoint)

while True:
try:
# empty_frame may either be:
# 1. a single null frame if the client is a REQ socket
# 2. an empty list (ie. no frames) if the client is a DEALER socket
identity, *empty_frame, msg = await self._socket.recv_multipart()
request = from_msgpack(msg)
# spawn an initial listen task
listen_task = asyncio.ensure_future(self._socket.recv_multipart())
task_list = [listen_task]

asyncio.ensure_future(self._process_request(identity, empty_frame, request))
except Exception:
_log.exception('Exception thrown in Server run loop')
while True:
dones, pendings = await asyncio.wait(task_list, return_when=asyncio.FIRST_COMPLETED)

# grab one "done" task to handle
task_list, done_list = list(pendings), list(dones)
done = done_list.pop()
task_list += done_list

if done == listen_task:
try:
# empty_frame may either be:
# 1. a single null frame if the client is a REQ socket
# 2. an empty list (ie. no frames) if the client is a DEALER socket
identity, *empty_frame, msg = done.result()
request = from_msgpack(msg)

# spawn a processing task
task_list.append(asyncio.ensure_future(
self._process_request(identity, empty_frame, request)))
except Exception as e:
if self.serialize_exceptions:
_log.exception('Exception thrown in Server run loop during request '
'reception: {}'.format(str(e)))
else:
raise e
finally:
# spawn a new listen task
listen_task = asyncio.ensure_future(self._socket.recv_multipart())
task_list.append(listen_task)
else:
# if there's been an exception during processing, consider reraising it
try:
done.result()
except Exception as e:
if self.serialize_exceptions:
_log.exception('Exception thrown in Server run loop during request '
'dispatch: {}'.format(str(e)))
else:
raise e

def run(self, endpoint: str, loop: AbstractEventLoop = None):
"""
Expand Down Expand Up @@ -151,5 +194,9 @@ async def _process_request(self, identity: bytes, empty_frame: list, request: RP

_log.debug("Sending client %s reply: %s", identity, reply)
await self._socket.send_multipart([identity, *empty_frame, to_msgpack(reply)])
except Exception:
_log.exception('Exception thrown in _process_request')
except Exception as e:
if self.serialize_exceptions:
_log.exception('Exception thrown in _process_request')
else:
raise e

20 changes: 14 additions & 6 deletions rpcq/_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class RPCSpec(object):
"""
Class for keeping track of class methods that are exposed to the JSON RPC interface
"""
def __init__(self, *, provide_tracebacks: bool = True):
def __init__(self, *, provide_tracebacks: bool = True, serialize_exceptions: bool = True):
"""
Create a JsonRpcSpec object.
Expand Down Expand Up @@ -61,9 +61,14 @@ def add(obj, *args):
implementations will have their tracebacks forwarded to the calling client as part of
the generated RPCError reply objject. If set to False, the generated RPCError reply will
omit this information (but the traceback will still get written to the logfile).
:param serialize_exceptions: If set to True, unhandled exceptions which occur during RPC
call implementations will be serialized into RPCError messages (which the Server
instance will then probably send to the corresponding Client). If set to False, the
exception is re-raised and left for the local caller to handle further.
"""
self._json_rpc_methods = {}
self.provide_tracebacks = provide_tracebacks
self.serialize_exceptions = serialize_exceptions

def add_handler(self, f):
"""
Expand Down Expand Up @@ -114,11 +119,14 @@ async def run_handler(self, request: RPCRequest) -> Union[RPCReply, RPCError]:
result = await result

except Exception as e:
_traceback = traceback.format_exc()
_log.error(_traceback)
if self.provide_tracebacks:
return rpc_error(request.id, "{}\n{}".format(str(e), _traceback))
if self.serialize_exceptions:
_traceback = traceback.format_exc()
_log.error(_traceback)
if self.provide_tracebacks:
return rpc_error(request.id, "{}\n{}".format(str(e), _traceback))
else:
return rpc_error(request.id, str(e))
else:
return rpc_error(request.id, str(e))
raise e

return rpc_reply(request.id, result)

0 comments on commit ccc42a6

Please sign in to comment.