Skip to content

Commit

Permalink
Update client to use futures
Browse files Browse the repository at this point in the history
  • Loading branch information
Ananto30 committed Jun 30, 2024
1 parent 2f80987 commit d9fcd3e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 41 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
__pycache__/
.vscode
venv/
venv*
venv-pypy/
.pytest_cache
.idea
Expand Down
19 changes: 13 additions & 6 deletions tests/concurrency/single_req.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from zero import ZeroClient
import asyncio
from zero import ZeroClient, AsyncZeroClient

client = ZeroClient("localhost", 5559)
async_client = AsyncZeroClient("localhost", 5559)

async def task(sleep_time):
res = await async_client.call("sleep", sleep_time)
assert res == f"slept for {sleep_time} msecs"
print(res)

async def main():
tasks = [task(200) for _ in range(1000)]
await asyncio.gather(*tasks)

if __name__ == "__main__":
for i in range(10):
res = client.call("sleep", 100)
if res != "slept for 100 msecs":
print(f"expected: slept for 100 msecs, got: {res}")
print(res)
asyncio.run(main())
50 changes: 15 additions & 35 deletions zero/zeromq_patterns/queue_device/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import logging
import sys
from asyncio import Event
from typing import Dict, Optional

import zmq
Expand Down Expand Up @@ -108,7 +107,7 @@ def __init__(self, default_timeout: int = 2000):
self.poller = zmqasync.Poller()
self.poller.register(self.socket, zmq.POLLIN)

self._resp_map: Dict[str, bytes] = {}
self._resp_futures: Dict[str, asyncio.Future] = {}

# self.peer1, self.peer2 = zpipe_async(self._context)

Expand All @@ -121,9 +120,6 @@ async def connect(self, address: str) -> None:

async def request(self, message: bytes, timeout: Optional[int] = None) -> bytes:
_timeout = self._default_timeout if timeout is None else timeout
expire_at = util.current_time_us() + (_timeout * 1000)

is_data = Event()

async def _poll_data():
# async has issue with poller, after 3-4 calls, it returns empty
Expand All @@ -137,48 +133,32 @@ async def _poll_data():

# the rest is response data
resp_data = resp[32:]
self._resp_map[resp_id] = resp_data

# pipe is a good way to notify the main event loop that there is a response
# but pipe is actually slower than sleep, because it is a zmq socket
# yes it uses inproc, but still slower than asyncio.sleep
# try:
# await self.peer1.send(b"")
# except zmqerr.Again:
# # if the pipe is full, just pass
# pass

is_data.set()
future = self._resp_futures.pop(resp_id, None)
if future and not future.done():
future.set_result(resp_data)

req_id = util.unique_id()
future = self._resp_futures[req_id] = asyncio.Future()
await self._send(req_id.encode() + message)

# poll can get response of a different call
# so we poll until we get the response of this call or timeout
await _poll_data()

while req_id not in self._resp_map:
if util.current_time_us() > expire_at:
raise TimeoutException(
f"Timeout while waiting for response at {self._address}"
)

# await asyncio.sleep(1e-6)
await asyncio.wait_for(is_data.wait(), timeout=_timeout)

# try:
# await self.peer2.recv()
# except zmqerr.Again:
# # if the pipe is empty, just pass
# pass

resp_data = self._resp_map.pop(req_id)
try:
resp_data = await asyncio.wait_for(future, _timeout)
except asyncio.TimeoutError:
raise TimeoutException(

Check warning on line 150 in zero/zeromq_patterns/queue_device/client.py

View check run for this annotation

Codecov / codecov/patch

zero/zeromq_patterns/queue_device/client.py#L149-L150

Added lines #L149 - L150 were not covered by tests
f"Timeout while waiting for response at {self._address}"
)
finally:
if req_id in self._resp_futures:
del self._resp_futures[req_id]

Check warning on line 155 in zero/zeromq_patterns/queue_device/client.py

View check run for this annotation

Codecov / codecov/patch

zero/zeromq_patterns/queue_device/client.py#L155

Added line #L155 was not covered by tests

return resp_data

def close(self) -> None:
self.socket.close()
self._resp_map.clear()
self._resp_futures.clear()

async def _send(self, message: bytes) -> None:
try:
Expand Down

0 comments on commit d9fcd3e

Please sign in to comment.