Skip to content

Commit

Permalink
test_cluster: fix deadlock in NodeProxy
Browse files Browse the repository at this point in the history
`NodeProxy` acts as a proxy server that connects to Valkey. In its
handler, it creates two pipes:
	1. Connecting valkey reader to own writer
	2. Connecting own reader to valkey writer

Then, it `await`s on these two pipes via `asyncio.gather`.
`asyncio.gather` itself returns a future that can be canceled, and
originally it was assumingly thought that canceling `self.task` in
`NodeProxy` would cancel the pipes, too. That was not the case. This
behaviour caused warnings in Python 3.9. On Python 3.12, it caused a
deadlock.

In order to fix it, the pipes future should be assigned to a variable so
that it can be canceled explicitly upon `aclose()`. Additionally, the
`NodeProxy`'s handler should handle a situation when pipes future was
canceled and close the `Writer` side in order for the loop to end
gracefully.

This commit introduces `self.pipes` in `NodeProxy` and handles the
cancellation properly therefore fixing the deadlock.

Signed-off-by: Mikhail Koviazin <[email protected]>
  • Loading branch information
mkmkme committed Jul 11, 2024
1 parent a480f64 commit e001c88
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion tests/test_asyncio/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(self, addr, valkey_addr):
self.send_event = asyncio.Event()
self.server = None
self.task = None
self.pipes = None
self.n_connections = 0

async def start(self):
Expand All @@ -79,12 +80,18 @@ async def handle(self, reader, writer):
self.n_connections += 1
pipe1 = asyncio.create_task(self.pipe(reader, valkey_writer))
pipe2 = asyncio.create_task(self.pipe(valkey_reader, writer))
await asyncio.gather(pipe1, pipe2)
self.pipes = asyncio.gather(pipe1, pipe2)
await self.pipes
except asyncio.CancelledError:
writer.close()
finally:
valkey_writer.close()

async def aclose(self):
self.task.cancel()
# self.pipes can be None if handle was never called
if self.pipes is not None:
self.pipes.cancel()
try:
await self.task
except asyncio.CancelledError:
Expand Down

0 comments on commit e001c88

Please sign in to comment.