diff --git a/tests/concurrency/sync.py b/tests/concurrency/sync.py index aa31403..eec12dc 100644 --- a/tests/concurrency/sync.py +++ b/tests/concurrency/sync.py @@ -8,7 +8,7 @@ client = ZeroClient("localhost", 5559) -func = partial(client.call, "sleep") +func = partial(client.call, "sleep_async") def get_and_print(msg): diff --git a/zero/client_server/worker.py b/zero/client_server/worker.py index 6e485df..e8d8bf2 100644 --- a/zero/client_server/worker.py +++ b/zero/client_server/worker.py @@ -8,6 +8,7 @@ from zero.codegen.codegen import CodeGen from zero.encoder.protocols import Encoder from zero.error import SERVER_PROCESSING_ERROR +from zero.utils.async_to_sync import async_to_sync from zero.zero_mq.factory import get_worker @@ -80,7 +81,8 @@ def handle_msg(self, rpc, msg): # TODO: is this a bottleneck if inspect.iscoroutinefunction(func): # this is blocking - ret = self._loop.run_until_complete(func(msg) if msg else func()) + # ret = self._loop.run_until_complete(func(msg) if msg else func()) + ret = async_to_sync(func)(msg) if msg else async_to_sync(func)() else: ret = func(msg) if msg else func() diff --git a/zero/utils/async_to_sync.py b/zero/utils/async_to_sync.py new file mode 100644 index 0000000..91d3a56 --- /dev/null +++ b/zero/utils/async_to_sync.py @@ -0,0 +1,19 @@ +import asyncio +import threading +from functools import wraps + +_loop = asyncio.new_event_loop() + +_thrd = threading.Thread(target=_loop.run_forever, name="Async Runner", daemon=True) + + +def async_to_sync(func): + @wraps(func) + def run(*args, **kwargs): + if not _thrd.is_alive(): + _thrd.start() + + future = asyncio.run_coroutine_threadsafe(func(*args, **kwargs), _loop) + return future.result() + + return run