Skip to content

Commit

Permalink
Run async func in thread in a different loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Ananto30 committed May 15, 2024
1 parent 8178ec8 commit b594741
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
2 changes: 1 addition & 1 deletion tests/concurrency/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
client = ZeroClient("localhost", 5559)


func = partial(client.call, "sleep")
func = partial(client.call, "sleep_async")


def get_and_print(msg):
Expand Down
4 changes: 3 additions & 1 deletion zero/client_server/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from zero.codegen.codegen import CodeGen
from zero.encoder.protocols import Encoder
from zero.error import SERVER_PROCESSING_ERROR
from zero.utils.async_to_sync import async_to_sync
from zero.zero_mq.factory import get_worker


Expand Down Expand Up @@ -80,7 +81,8 @@ def handle_msg(self, rpc, msg):
# TODO: is this a bottleneck
if inspect.iscoroutinefunction(func):
# this is blocking
ret = self._loop.run_until_complete(func(msg) if msg else func())
# ret = self._loop.run_until_complete(func(msg) if msg else func())
ret = async_to_sync(func)(msg) if msg else async_to_sync(func)()
else:
ret = func(msg) if msg else func()

Expand Down
19 changes: 19 additions & 0 deletions zero/utils/async_to_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import asyncio
import threading
from functools import wraps

_loop = asyncio.new_event_loop()

_thrd = threading.Thread(target=_loop.run_forever, name="Async Runner", daemon=True)


def async_to_sync(func):
@wraps(func)
def run(*args, **kwargs):
if not _thrd.is_alive():
_thrd.start()

future = asyncio.run_coroutine_threadsafe(func(*args, **kwargs), _loop)
return future.result()

return run

0 comments on commit b594741

Please sign in to comment.