Skip to content

Commit

Permalink
Handle broken mem chan on Actor._push_result()
Browse files Browse the repository at this point in the history
When backpressure is used and a feeder mem chan breaks during msg
delivery (usually because the IPC allocating task already terminated)
instead of raising we simply warn as we do for the non-backpressure
case.

Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid
doing an arbiter-registry lookup if the current actor **is** the
registrar.
  • Loading branch information
goodboy committed Nov 10, 2022
1 parent d671316 commit 9be2130
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions tractor/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,12 @@ async def _push_result(

if ctx._backpressure:
log.warning(text)
await send_chan.send(msg)
try:
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
else:
try:
raise StreamOverrun(text) from None
Expand Down Expand Up @@ -1374,8 +1379,9 @@ async def async_main(
actor.lifetime_stack.close()

# Unregister actor from the arbiter
if registered_with_arbiter and (
actor._arb_addr is not None
if (
registered_with_arbiter
and not actor.is_arbiter
):
failed = False
with trio.move_on_after(0.5) as cs:
Expand Down

0 comments on commit 9be2130

Please sign in to comment.