Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make pause/resume_reading idepotent and no-op for closed transports #528

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,30 @@ def __init__(self, loop, sock, protocol, waiter=None,

def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
return
elif self._paused:
if self._loop.get_debug():
logger.debug(
"%r: ignoring pause_reading() call; already paused",
self)
else:
self._paused = True
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you want to remove the exception from pause_reading. But rising an error on trying to resume_reading on a closed transport seems reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think resume_reading is more important. from my experience pause_reading is getting called from protocol, and protocol has access to transport object, but resume_reading usually happen from higher level, from coroutine side. and coroutine usually doesn't have access to transport or protocol directly, it just writes to some kind of data stream.

self._loop.call_soon(self._loop_reading, self._read_fut)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
elif not self._paused:
if self._loop.get_debug():
logger.debug(
"%r: ignoring resume_reading() call; already reading",
self)
else:
self._paused = False
self._loop.call_soon(self._loop_reading, self._read_fut)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)

def _loop_reading(self, fut=None):
if self._paused:
Expand Down
68 changes: 42 additions & 26 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,23 +699,31 @@ def __init__(self, loop, sock, protocol, waiter=None,

def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
return
elif self._paused:
if self._loop.get_debug():
logger.debug(
"%r: ignoring pause_reading() call; already paused",
self)
else:
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
return
self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
elif not self._paused:
if self._loop.get_debug():
logger.debug(
"%r: ignoring resume_reading() call; already reading",
self)
else:
self._paused = False
self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)

def _read_ready(self):
if self._conn_lost:
Expand Down Expand Up @@ -932,23 +940,31 @@ def pause_reading(self):
# call resume_reading() again, and things will flow again.

if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
return
elif self._paused:
if self._loop.get_debug():
logger.debug(
"%r: ignoring pause_reading() call; already paused",
self)
else:
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
return
self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
elif not self._paused:
if self._loop.get_debug():
logger.debug(
"%r: ignoring resume_reading() call; already reading",
self)
else:
self._paused = False
self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)

def _read_ready(self):
if self._conn_lost:
Expand Down
17 changes: 17 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,31 @@ def test_pause_resume_reading(self):
for i in range(10):
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')

# pause_reading is idepotent
tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)

tr.resume_reading()
self.assertFalse(tr._paused)
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data3')
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data4')

# resume_reading is idepotent
tr.resume_reading()
tr.resume_reading()

tr.close()

# pause/resume reading is no-op on closed transport
tr.pause_reading()
self.assertFalse(tr._paused)
tr._paused = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need a public Transport.is_reading() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add Transport.paused property instead

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually prefer methods to properties: see Transport.is_closing() for instance. Anyways, do you use _paused attribute anywhere in aiohttp? Maybe we should create a separate issue to discuss this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aiohttp maintains paused state separately from asyncio transport. it would be useful to have public api for that.

in aiohttp paused state is maintained by different edges, protocol pauses transport
when it get enough data, but resume happen from other edge, when user reads-out data from incoming stream. that is the reason why I think pause/resume should be ideponent and no-op during closing stage.

tr.resume_reading()
self.assertTrue(tr._paused)

def pause_writing_transport(self, high):
tr = self.socket_transport()
Expand Down
37 changes: 33 additions & 4 deletions Lib/test/test_asyncio/test_selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,25 @@ def test_pause_resume_reading(self):
tr.pause_reading()
self.assertTrue(tr._paused)
self.assertFalse(7 in self.loop.readers)

# pause_reading is idepotent
tr.pause_reading()

tr.resume_reading()
self.assertFalse(tr._paused)
self.loop.assert_reader(7, tr._read_ready)
with self.assertRaises(RuntimeError):
tr.resume_reading()

# resume_reading is idepotent
tr.resume_reading()
tr.resume_reading()

# pause/resume reading is no-op on closed transport
tr.close()
tr.pause_reading()
self.assertFalse(tr._paused)
tr._paused = True
tr.resume_reading()
self.assertTrue(tr._paused)

def test_read_ready(self):
transport = self.socket_transport()
Expand Down Expand Up @@ -1229,12 +1243,27 @@ def test_pause_resume_reading(self):
self.loop.assert_reader(1, tr._read_ready)
tr.pause_reading()
self.assertTrue(tr._paused)

# pause_reading is idepotent
tr.pause_reading()
tr.pause_reading()

self.assertFalse(1 in self.loop.readers)
tr.resume_reading()
self.assertFalse(tr._paused)
self.loop.assert_reader(1, tr._read_ready)
with self.assertRaises(RuntimeError):
tr.resume_reading()

# resume_reading is idepotent
tr.resume_reading()
tr.resume_reading()

# pause/resume reading is no-op on closed transport
tr.close()
tr.pause_reading()
self.assertFalse(tr._paused)
tr._paused = True
tr.resume_reading()
self.assertTrue(tr._paused)

def test_write(self):
transport = self._make_one()
Expand Down
3 changes: 3 additions & 0 deletions Misc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ Library
- bpo-29534: Fixed different behaviour of Decimal.from_float()
for _decimal and _pydecimal. Thanks Andrew Nester.

- bpo-29745: Make pause/resume_reading idepotent and no-op
for closed asyncio transports. Patch by Nikolay Kim.

- Issue #28556: Various updates to typing module: typing.Counter, typing.ChainMap,
improved ABC caching, etc. Original PRs by Jelle Zijlstra, Ivan Levkivskyi,
Manuel Krebber, and Łukasz Langa.
Expand Down