From 74dafce01ae215a19ab2213634f4e8a930695f10 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 5 May 2019 08:57:17 -0400 Subject: [PATCH 1/7] Make close and write optionally async --- Lib/asyncio/streams.py | 28 ++++++++++++++++++--------- Lib/test/test_asyncio/test_streams.py | 6 +++--- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 33fc303a6ffcfc..464006b9a99fcf 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -318,6 +318,8 @@ def __init__(self, transport, protocol, reader, loop): assert reader is None or isinstance(reader, StreamReader) self._reader = reader self._loop = loop + self._complete_fut = self._loop.create_future() + self._complete_fut.set_result(None) def __repr__(self): info = [self.__class__.__name__, f'transport={self._transport!r}'] @@ -331,6 +333,21 @@ def transport(self): def write(self, data): self._transport.write(data) + if self._reader is not None: + # this branch will be simplified after merging reader with writer + exc = self._reader.exception() + if exc is not None: + fut = self._loop.create_future() + fut.set_exception(exc) + return fut + if not self._transport.is_closing(): + if self._protocol._connection_lost: + fut = self._loop.create_future() + fut.set_exception(ConnectionResetError('Connection lost')) + return fut + if not self._protocol._paused: + return self._complete_fut + return self._loop.create_task(self.drain()) def writelines(self, data): self._transport.writelines(data) @@ -343,6 +360,7 @@ def can_write_eof(self): def close(self): self._transport.close() + return self._protocol._closed def is_closing(self): return self._transport.is_closing() @@ -372,17 +390,9 @@ async def drain(self): # write(...); await drain() # in a loop would never call connection_lost(), so it # would not see an error when the socket is closed. - await sleep(0, loop=self._loop) + await self._protocol._closed await self._protocol._drain_helper() - async def aclose(self): - self.close() - await self.wait_closed() - - async def awrite(self, data): - self.write(data) - await self.drain() - class StreamReader: diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 630f91dbf4780f..f1867b1045f50f 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -973,7 +973,7 @@ def test_async_writer_api(self): asyncio.open_connection(*httpd.address, loop=self.loop)) - f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n') + f = wr.write(b'GET / HTTP/1.0\r\n\r\n') self.loop.run_until_complete(f) f = rd.readline() data = self.loop.run_until_complete(f) @@ -981,7 +981,7 @@ def test_async_writer_api(self): f = rd.read() data = self.loop.run_until_complete(f) self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - f = wr.aclose() + f = wr.close() self.loop.run_until_complete(f) self.assertEqual(messages, []) @@ -996,7 +996,7 @@ def test_eof_feed_when_closing_writer(self): asyncio.open_connection(*httpd.address, loop=self.loop)) - f = wr.aclose() + f = wr.close() self.loop.run_until_complete(f) assert rd.at_eof() f = rd.read() From 8e25ae539c5f6dd8c7991f7c5d7cdfe84683c0f8 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 5 May 2019 09:59:41 -0400 Subject: [PATCH 2/7] Fix waiting in StreamWriter.drain for closing SSL transport --- Lib/asyncio/streams.py | 12 ++++------ Lib/test/test_asyncio/test_streams.py | 23 +++++++++++++++++++ .../2019-05-05-09-45-44.bpo-36801.XrlFFs.rst | 1 + 3 files changed, 29 insertions(+), 7 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2019-05-05-09-45-44.bpo-36801.XrlFFs.rst diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 33fc303a6ffcfc..16b3b0aa04c804 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -366,13 +366,11 @@ async def drain(self): if exc is not None: raise exc if self._transport.is_closing(): - # Yield to the event loop so connection_lost() may be - # called. Without this, _drain_helper() would return - # immediately, and code that calls - # write(...); await drain() - # in a loop would never call connection_lost(), so it - # would not see an error when the socket is closed. - await sleep(0, loop=self._loop) + # Wait for protocol.connection_lost() call + # Raise connection closing error if any, + # ConnectionResetError otherwise + await self._protocol._closed + raise ConnectionResetError('Connection lost') await self._protocol._drain_helper() async def aclose(self): diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index 630f91dbf4780f..e57868cb4721e6 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -109,6 +109,29 @@ def test_open_unix_connection_no_loop_ssl(self): self._basetest_open_connection_no_loop_ssl(conn_fut) + @unittest.skipIf(ssl is None, 'No ssl module') + def test_drain_on_closed_writer_ssl(self): + + async def inner(httpd): + reader, writer = await asyncio.open_connection( + *httpd.address, + ssl=test_utils.dummy_ssl_context()) + + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) + writer.write(b'GET / HTTP/1.0\r\n\r\n') + data = await reader.read() + self.assertTrue(data.endswith(b'\r\n\r\nTest message')) + + writer.close() + with self.assertRaises(ConnectionResetError): + await writer.drain() + + self.assertEqual(messages, []) + + with test_utils.run_test_server(use_ssl=True) as httpd: + self.loop.run_until_complete(inner(httpd)) + def _basetest_open_connection_error(self, open_connection_fut): messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) diff --git a/Misc/NEWS.d/next/Library/2019-05-05-09-45-44.bpo-36801.XrlFFs.rst b/Misc/NEWS.d/next/Library/2019-05-05-09-45-44.bpo-36801.XrlFFs.rst new file mode 100644 index 00000000000000..43e51fe5ca94d5 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-05-09-45-44.bpo-36801.XrlFFs.rst @@ -0,0 +1 @@ +Properly handle SSL connection closing in asyncio StreamWriter.drain() call. From c50084a964239eb95fed932c5c923e99a0c7c57e Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 5 May 2019 10:27:30 -0400 Subject: [PATCH 3/7] Support await by stream.write(), stream.writelines() and stream.close() calls --- Doc/library/asyncio-stream.rst | 64 ++++++++++--------- Lib/asyncio/streams.py | 15 +++-- Lib/test/test_asyncio/test_streams.py | 40 ++++++++---- .../2019-05-05-10-12-23.bpo-36802.HYMc8P.rst | 2 + 4 files changed, 77 insertions(+), 44 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index e686a6a1c4cd32..6e2fc8a3c0341f 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -22,13 +22,13 @@ streams:: '127.0.0.1', 8888) print(f'Send: {message!r}') - await writer.awrite(message.encode()) + await writer.write(message.encode()) data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') - await writer.aclose() + await writer.close() asyncio.run(tcp_echo_client('Hello World!')) @@ -226,23 +226,48 @@ StreamWriter directly; use :func:`open_connection` and :func:`start_server` instead. - .. coroutinemethod:: awrite(data) + .. method:: write(data) Write *data* to the stream. - The method respects flow control, execution is paused if the write - buffer reaches the high watermark. + If the method is called using ``await stream.write(data)`` syntax it + respects flow control, execution is paused if the write buffer reaches the high + watermark *(recommended API)*. + + If the method is called using *sync* ``stream.write(data)`` form it *is not* a + subject to flow control. Such call should + be followed by :meth:`drain`. + + .. versionchanged:: 3.8 + Support ``await stream.write(...)`` syntax. + + .. method:: writelines(data) + + Write a list (or any iterable) of bytes to the stream. - .. versionadded:: 3.8 + If the method is called using ``await stream.writelines(lines)`` syntax it + respects flow control, execution is paused if the write buffer reaches the high + watermark *(recommended API)*. - .. coroutinemethod:: aclose() + If the method is called using *sync* ``stream.writelines(lines)`` form it *is not* + subject to flow control. Such calls should be followed by :meth:`drain`. + + .. versionchanged:: 3.8 + Support ``await stream.writelines()`` syntax. + + .. method:: close() Close the stream. - Wait until all closing actions are complete, e.g. SSL shutdown for - secure sockets. + If the method is called using ``await stream.close()`` syntax it waits until all + closing actions are complete, e.g. SSL shutdown for secure sockets *(recommended + API)*. - .. versionadded:: 3.8 + If the method is called using *sync* ``stream.close()`` form it *does not* wait + for actual socket closing. The call should be followed by :meth:`wait_closed`. + + .. versionchanged:: 3.8 + Support ``await stream.close()`` syntax. .. method:: can_write_eof() @@ -263,21 +288,6 @@ StreamWriter Access optional transport information; see :meth:`BaseTransport.get_extra_info` for details. - .. method:: write(data) - - Write *data* to the stream. - - This method is not subject to flow control. Calls to ``write()`` should - be followed by :meth:`drain`. The :meth:`awrite` method is a - recommended alternative the applies flow control automatically. - - .. method:: writelines(data) - - Write a list (or any iterable) of bytes to the stream. - - This method is not subject to flow control. Calls to ``writelines()`` - should be followed by :meth:`drain`. - .. coroutinemethod:: drain() Wait until it is appropriate to resume writing to the stream. @@ -293,10 +303,6 @@ StreamWriter be resumed. When there is nothing to wait for, the :meth:`drain` returns immediately. - .. method:: close() - - Close the stream. - .. method:: is_closing() Return ``True`` if the stream is closed or in the process of diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 464006b9a99fcf..c6da71ebc08b57 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -333,6 +333,16 @@ def transport(self): def write(self, data): self._transport.write(data) + return self._fast_drain() + + def writelines(self, data): + self._transport.writelines(data) + return self._fast_drain() + + def _fast_drain(self): + # The helper tries to use fast-path to return already existing complete future + # object if underlying transport is not paused and actual waiting for writing + # resume is not needed if self._reader is not None: # this branch will be simplified after merging reader with writer exc = self._reader.exception() @@ -349,9 +359,6 @@ def write(self, data): return self._complete_fut return self._loop.create_task(self.drain()) - def writelines(self, data): - self._transport.writelines(data) - def write_eof(self): return self._transport.write_eof() @@ -390,7 +397,7 @@ async def drain(self): # write(...); await drain() # in a loop would never call connection_lost(), so it # would not see an error when the socket is closed. - await self._protocol._closed + await sleep(0, loop=self._loop) await self._protocol._drain_helper() diff --git a/Lib/test/test_asyncio/test_streams.py b/Lib/test/test_asyncio/test_streams.py index f1867b1045f50f..212070552c29d1 100644 --- a/Lib/test/test_asyncio/test_streams.py +++ b/Lib/test/test_asyncio/test_streams.py @@ -965,24 +965,42 @@ def test_del_stream_before_connection_made(self): messages[0]['message']) def test_async_writer_api(self): + async def inner(httpd): + rd, wr = await asyncio.open_connection(*httpd.address) + + await wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() + self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') + data = await rd.read() + self.assertTrue(data.endswith(b'\r\n\r\nTest message')) + await wr.close() + messages = [] self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) with test_utils.run_test_server() as httpd: - rd, wr = self.loop.run_until_complete( - asyncio.open_connection(*httpd.address, - loop=self.loop)) + self.loop.run_until_complete(inner(httpd)) - f = wr.write(b'GET / HTTP/1.0\r\n\r\n') - self.loop.run_until_complete(f) - f = rd.readline() - data = self.loop.run_until_complete(f) + self.assertEqual(messages, []) + + def test_async_writer_api(self): + async def inner(httpd): + rd, wr = await asyncio.open_connection(*httpd.address) + + await wr.write(b'GET / HTTP/1.0\r\n\r\n') + data = await rd.readline() self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') - f = rd.read() - data = self.loop.run_until_complete(f) + data = await rd.read() self.assertTrue(data.endswith(b'\r\n\r\nTest message')) - f = wr.close() - self.loop.run_until_complete(f) + wr.close() + with self.assertRaises(ConnectionResetError): + await wr.write(b'data') + + messages = [] + self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) + + with test_utils.run_test_server() as httpd: + self.loop.run_until_complete(inner(httpd)) self.assertEqual(messages, []) diff --git a/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst b/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst new file mode 100644 index 00000000000000..f59863b7b7a837 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-05-10-12-23.bpo-36802.HYMc8P.rst @@ -0,0 +1,2 @@ +Provide both sync and async calls for StreamWriter.write() and +StreamWriter.close() From b6e550235b15a187e5aaaaae9e032084d93c7766 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 5 May 2019 14:01:28 -0400 Subject: [PATCH 4/7] Add proto._get_close_waiter(stream) to get closing future for corresponding stream --- Lib/asyncio/streams.py | 11 +++++++++-- Lib/asyncio/subprocess.py | 9 +++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 16b3b0aa04c804..03c75a9f043998 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -179,6 +179,9 @@ async def _drain_helper(self): self._drain_waiter = waiter await waiter + def _get_close_waiter(self, stream): + raise NotImplementedError + class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): """Helper class to adapt between Protocol and StreamReader. @@ -293,6 +296,9 @@ def eof_received(self): return False return True + def _get_close_waiter(self, stream): + return self._closed + def __del__(self): # Prevent reports about unhandled exceptions. # Better than self._closed._log_traceback = False hack @@ -348,7 +354,7 @@ def is_closing(self): return self._transport.is_closing() async def wait_closed(self): - await self._protocol._closed + await self._protocol._get_close_waiter(self) def get_extra_info(self, name, default=None): return self._transport.get_extra_info(name, default) @@ -369,7 +375,8 @@ async def drain(self): # Wait for protocol.connection_lost() call # Raise connection closing error if any, # ConnectionResetError otherwise - await self._protocol._closed + fut = self._protocol._get_close_waiter(self) + await fut raise ConnectionResetError('Connection lost') await self._protocol._drain_helper() diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index 90fc00de8339fb..ecacc1a66dbf4e 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -25,6 +25,7 @@ def __init__(self, limit, loop): self._transport = None self._process_exited = False self._pipe_fds = [] + self._stdin_closed = self._loop.create_future() def __repr__(self): info = [self.__class__.__name__] @@ -76,6 +77,10 @@ def pipe_connection_lost(self, fd, exc): if pipe is not None: pipe.close() self.connection_lost(exc) + if exc is None: + self._stdin_closed.set_result(None) + else: + self._stdin_closed.set_exception(exc) return if fd == 1: reader = self.stdout @@ -102,6 +107,10 @@ def _maybe_close_transport(self): self._transport.close() self._transport = None + def _get_close_waiter(self, stream): + if stream is self.stdin: + return self._stdin_closed + class Process: def __init__(self, transport, protocol, loop): From e2fed9fc2b11dea30a10395f7d686569e9191ea4 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 5 May 2019 15:04:17 -0400 Subject: [PATCH 5/7] Fix subprocess tests --- Lib/asyncio/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index d6afabff1161dd..4a68f191c20002 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -373,7 +373,7 @@ def can_write_eof(self): def close(self): self._transport.close() - return self._protocol._closed + return self._protocol._get_close_waiter(self) def is_closing(self): return self._transport.is_closing() From d3e46858bb239a653eabd1176263e3e7f963e38a Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 7 May 2019 15:59:50 -0400 Subject: [PATCH 6/7] Update documentation for changed methods --- Doc/library/asyncio-stream.rst | 60 +++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/Doc/library/asyncio-stream.rst b/Doc/library/asyncio-stream.rst index 6e2fc8a3c0341f..e735b81f234d26 100644 --- a/Doc/library/asyncio-stream.rst +++ b/Doc/library/asyncio-stream.rst @@ -228,43 +228,65 @@ StreamWriter .. method:: write(data) - Write *data* to the stream. + The method attempts to write the *data* to the underlying socket immediately. + If that fails, the data is queued in an internal write buffer until it can be + sent. - If the method is called using ``await stream.write(data)`` syntax it - respects flow control, execution is paused if the write buffer reaches the high - watermark *(recommended API)*. + Starting with Python 3.8, it is possible to directly await on the `write()` + method:: - If the method is called using *sync* ``stream.write(data)`` form it *is not* a - subject to flow control. Such call should - be followed by :meth:`drain`. + await stream.write(data) + + The ``await`` pauses the current coroutine until the data is written to the + socket. + + Below is an equivalent code that works with Python <= 3.7:: + + stream.write(data) + await stream.drain() .. versionchanged:: 3.8 Support ``await stream.write(...)`` syntax. .. method:: writelines(data) - Write a list (or any iterable) of bytes to the stream. + The method writes a list (or any iterable) of bytes to the underlying socket + immediately. + If that fails, the data is queued in an internal write buffer until it can be + sent. + + Starting with Python 3.8, it is possible to directly await on the `write()` + method:: + + await stream.writelines(lines) - If the method is called using ``await stream.writelines(lines)`` syntax it - respects flow control, execution is paused if the write buffer reaches the high - watermark *(recommended API)*. + The ``await`` pauses the current coroutine until the data is written to the + socket. - If the method is called using *sync* ``stream.writelines(lines)`` form it *is not* - subject to flow control. Such calls should be followed by :meth:`drain`. + Below is an equivalent code that works with Python <= 3.7:: + + stream.writelines(lines) + await stream.drain() .. versionchanged:: 3.8 Support ``await stream.writelines()`` syntax. .. method:: close() - Close the stream. + The method closes the stream and the underlying socket. + + Starting with Python 3.8, it is possible to directly await on the `close()` + method:: + + await stream.close() + + The ``await`` pauses the current coroutine until the stream and the underlying + socket are closed (and SSL shutdown is performed for a secure connection). - If the method is called using ``await stream.close()`` syntax it waits until all - closing actions are complete, e.g. SSL shutdown for secure sockets *(recommended - API)*. + Below is an equivalent code that works with Python <= 3.7:: - If the method is called using *sync* ``stream.close()`` form it *does not* wait - for actual socket closing. The call should be followed by :meth:`wait_closed`. + stream.close() + await stream.wait_closed() .. versionchanged:: 3.8 Support ``await stream.close()`` syntax. From 6255d2b5af8c5f7530caa94f19d293ce99178c83 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 7 May 2019 16:51:44 -0400 Subject: [PATCH 7/7] Add a comment to emphasize fast path for stream draining --- Lib/asyncio/streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py index 4a68f191c20002..e7f2ccd0903082 100644 --- a/Lib/asyncio/streams.py +++ b/Lib/asyncio/streams.py @@ -362,6 +362,8 @@ def _fast_drain(self): fut.set_exception(ConnectionResetError('Connection lost')) return fut if not self._protocol._paused: + # fast path, the stream is not paused + # no need to wait for resume signal return self._complete_fut return self._loop.create_task(self.drain())