diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index d94fa587cd3a47..a35ff75ed9afd1 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -334,6 +334,9 @@ Running Tasks Concurrently cancellation of one submitted Task/Future to cause other Tasks/Futures to be cancelled. + .. deprecated-removed:: 3.8 3.10 + The *loop* parameter. + .. _asyncio_example_gather: Example:: @@ -411,6 +414,9 @@ Shielding From Cancellation except CancelledError: res = None + .. deprecated-removed:: 3.8 3.10 + The *loop* parameter. + Timeouts ======== @@ -568,6 +574,9 @@ Waiting Primitives Raises :exc:`asyncio.TimeoutError` if the timeout occurs before all Futures are done. + .. deprecated-removed:: 3.8 3.10 + The *loop* parameter. + Example:: for f in as_completed(aws): @@ -694,6 +703,9 @@ Task Object .. versionchanged:: 3.8 Added the ``name`` parameter. + .. deprecated-removed:: 3.8 3.10 + The *loop* parameter. + .. method:: cancel() Request the Task to be cancelled. diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 78e76003b3ac22..99722189b47863 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -116,6 +116,9 @@ def all_tasks(cls, loop=None): return _all_tasks_compat(loop) def __init__(self, coro, *, loop=None, name=None): + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) super().__init__(loop=loop) if self._source_traceback: del self._source_traceback[-1] @@ -544,7 +547,12 @@ def as_completed(fs, *, loop=None, timeout=None): """ if futures.isfuture(fs) or coroutines.iscoroutine(fs): raise TypeError(f"expect a list of futures, not {type(fs).__name__}") - loop = loop if loop is not None else events.get_event_loop() + if loop is None: + loop = events.get_event_loop() + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) todo = {ensure_future(f, loop=loop) for f in set(fs)} from .queues import Queue # Import here to avoid circular import problem. done = Queue(loop=loop) @@ -619,6 +627,10 @@ def ensure_future(coro_or_future, *, loop=None): If the argument is a Future, it is returned directly. """ + if loop: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) if coroutines.iscoroutine(coro_or_future): if loop is None: loop = events.get_event_loop() @@ -659,6 +671,9 @@ class _GatheringFuture(futures.Future): """ def __init__(self, children, *, loop=None): + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) super().__init__(loop=loop) self._children = children self._cancel_requested = False @@ -704,6 +719,10 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False): if not coros_or_futures: if loop is None: loop = events.get_event_loop() + else: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) outer = loop.create_future() outer.set_result([]) return outer @@ -813,6 +832,10 @@ def shield(arg, *, loop=None): except CancelledError: res = None """ + if loop: + warnings.warn("The loop argument is deprecated since Python 3.8, " + "and scheduled for removal in Python 3.10.", + DeprecationWarning, stacklevel=2) inner = ensure_future(arg, loop=loop) if inner.done(): # Shortcut. diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 02a97c60ac1a93..3ca786e3b2eb22 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -254,11 +254,12 @@ def test_call_later_negative_delays(self): def cb(arg): calls.append(arg) - self.loop._process_events = mock.Mock() - self.loop.call_later(-1, cb, 'a') - self.loop.call_later(-2, cb, 'b') - test_utils.run_briefly(self.loop) - self.assertEqual(calls, ['b', 'a']) + with self.assertWarns(DeprecationWarning): + self.loop._process_events = mock.Mock() + self.loop.call_later(-1, cb, 'a') + self.loop.call_later(-2, cb, 'b') + test_utils.run_briefly(self.loop) + self.assertEqual(calls, ['b', 'a']) def test_time_and_call_at(self): def cb(): @@ -302,7 +303,6 @@ def test_check_thread(self): def check_in_thread(loop, event, debug, create_loop, fut): # wait until the event loop is running event.wait() - try: if create_loop: loop2 = base_events.BaseEventLoop() @@ -465,15 +465,17 @@ def cb(): self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) def test_run_until_complete_type_error(self): - self.assertRaises(TypeError, - self.loop.run_until_complete, 'blah') + with self.assertWarns(DeprecationWarning): + self.assertRaises(TypeError, + self.loop.run_until_complete, 'blah') def test_run_until_complete_loop(self): - task = asyncio.Future(loop=self.loop) - other_loop = self.new_test_loop() - self.addCleanup(other_loop.close) - self.assertRaises(ValueError, - other_loop.run_until_complete, task) + with self.assertWarns(DeprecationWarning): + task = asyncio.Future(loop=self.loop) + other_loop = self.new_test_loop() + self.addCleanup(other_loop.close) + self.assertRaises(ValueError, + other_loop.run_until_complete, task) def test_run_until_complete_loop_orphan_future_close_loop(self): class ShowStopper(SystemExit): @@ -484,15 +486,15 @@ async def foo(delay): def throw(): raise ShowStopper + with self.assertWarns(DeprecationWarning): + self.loop._process_events = mock.Mock() + self.loop.call_soon(throw) + with self.assertRaises(ShowStopper): + self.loop.run_until_complete(foo(0.1)) - self.loop._process_events = mock.Mock() - self.loop.call_soon(throw) - with self.assertRaises(ShowStopper): - self.loop.run_until_complete(foo(0.1)) - - # This call fails if run_until_complete does not clean up - # done-callback for the previous future. - self.loop.run_until_complete(foo(0.2)) + # This call fails if run_until_complete does not clean up + # done-callback for the previous future. + self.loop.run_until_complete(foo(0.2)) def test_subprocess_exec_invalid_args(self): args = [sys.executable, '-c', 'pass'] @@ -576,26 +578,26 @@ def test_default_exc_handler_coro(self): async def zero_error_coro(): await asyncio.sleep(0.01) 1/0 - - # Test Future.__del__ - with mock.patch('asyncio.base_events.logger') as log: - fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) - fut.add_done_callback(lambda *args: self.loop.stop()) - self.loop.run_forever() - fut = None # Trigger Future.__del__ or futures._TracebackLogger - support.gc_collect() - if PY34: - # Future.__del__ in Python 3.4 logs error with - # an actual exception context - log.error.assert_called_with( - test_utils.MockPattern('.*exception was never retrieved'), - exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) - else: - # futures._TracebackLogger logs only textual traceback - log.error.assert_called_with( - test_utils.MockPattern( - '.*exception was never retrieved.*ZeroDiv'), - exc_info=False) + with self.assertWarns(DeprecationWarning): + # Test Future.__del__ + with mock.patch('asyncio.base_events.logger') as log: + fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) + fut.add_done_callback(lambda *args: self.loop.stop()) + self.loop.run_forever() + fut = None # Trigger Future.__del__ or futures._TracebackLogger + support.gc_collect() + if PY34: + # Future.__del__ in Python 3.4 logs error with + # an actual exception context + log.error.assert_called_with( + test_utils.MockPattern('.*exception was never retrieved'), + exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) + else: + # futures._TracebackLogger logs only textual traceback + log.error.assert_called_with( + test_utils.MockPattern( + '.*exception was never retrieved.*ZeroDiv'), + exc_info=False) def test_set_exc_handler_invalid(self): with self.assertRaisesRegex(TypeError, 'A callable object or None'): @@ -722,24 +724,24 @@ class MyTask(asyncio.Task): async def coro(): pass + with self.assertWarns(DeprecationWarning): + factory = lambda loop, coro: MyTask(coro, loop=loop) - factory = lambda loop, coro: MyTask(coro, loop=loop) + self.assertIsNone(self.loop.get_task_factory()) + self.loop.set_task_factory(factory) + self.assertIs(self.loop.get_task_factory(), factory) - self.assertIsNone(self.loop.get_task_factory()) - self.loop.set_task_factory(factory) - self.assertIs(self.loop.get_task_factory(), factory) + task = self.loop.create_task(coro()) + self.assertTrue(isinstance(task, MyTask)) + self.loop.run_until_complete(task) - task = self.loop.create_task(coro()) - self.assertTrue(isinstance(task, MyTask)) - self.loop.run_until_complete(task) - - self.loop.set_task_factory(None) - self.assertIsNone(self.loop.get_task_factory()) + self.loop.set_task_factory(None) + self.assertIsNone(self.loop.get_task_factory()) - task = self.loop.create_task(coro()) - self.assertTrue(isinstance(task, asyncio.Task)) - self.assertFalse(isinstance(task, MyTask)) - self.loop.run_until_complete(task) + task = self.loop.create_task(coro()) + self.assertTrue(isinstance(task, asyncio.Task)) + self.assertFalse(isinstance(task, MyTask)) + self.loop.run_until_complete(task) def test_env_var_debug(self): code = '\n'.join(( @@ -782,28 +784,30 @@ class EventLoop(base_events.BaseEventLoop): def create_task(self, coro): return MyTask(coro, loop=loop) - loop = EventLoop() - self.set_event_loop(loop) + with self.assertWarns(DeprecationWarning): + loop = EventLoop() + self.set_event_loop(loop) - coro = test() - task = asyncio.ensure_future(coro, loop=loop) - self.assertIsInstance(task, MyTask) + coro = test() + task = asyncio.ensure_future(coro, loop=loop) + self.assertIsInstance(task, MyTask) - # make warnings quiet - task._log_destroy_pending = False - coro.close() + # make warnings quiet + task._log_destroy_pending = False + coro.close() def test_create_named_task_with_default_factory(self): async def test(): pass - loop = asyncio.new_event_loop() - task = loop.create_task(test(), name='test_task') - try: - self.assertEqual(task.get_name(), 'test_task') - finally: - loop.run_until_complete(task) - loop.close() + with self.assertWarns(DeprecationWarning): + loop = asyncio.new_event_loop() + task = loop.create_task(test(), name='test_task') + try: + self.assertEqual(task.get_name(), 'test_task') + finally: + loop.run_until_complete(task) + loop.close() def test_create_named_task_with_custom_factory(self): def task_factory(loop, coro): @@ -811,15 +815,15 @@ def task_factory(loop, coro): async def test(): pass - - loop = asyncio.new_event_loop() - loop.set_task_factory(task_factory) - task = loop.create_task(test(), name='test_task') - try: - self.assertEqual(task.get_name(), 'test_task') - finally: - loop.run_until_complete(task) - loop.close() + with self.assertWarns(DeprecationWarning): + loop = asyncio.new_event_loop() + loop.set_task_factory(task_factory) + task = loop.create_task(test(), name='test_task') + try: + self.assertEqual(task.get_name(), 'test_task') + finally: + loop.run_until_complete(task) + loop.close() def test_run_forever_keyboard_interrupt(self): # Python issue #22601: ensure that the temporary task created by @@ -827,18 +831,19 @@ def test_run_forever_keyboard_interrupt(self): # a warning async def raise_keyboard_interrupt(): raise KeyboardInterrupt + + with self.assertWarns(DeprecationWarning): + self.loop._process_events = mock.Mock() + self.loop.call_exception_handler = mock.Mock() - self.loop._process_events = mock.Mock() - self.loop.call_exception_handler = mock.Mock() - - try: - self.loop.run_until_complete(raise_keyboard_interrupt()) - except KeyboardInterrupt: - pass - self.loop.close() - support.gc_collect() + try: + self.loop.run_until_complete(raise_keyboard_interrupt()) + except KeyboardInterrupt: + pass + self.loop.close() + support.gc_collect() - self.assertFalse(self.loop.call_exception_handler.called) + self.assertFalse(self.loop.call_exception_handler.called) def test_run_until_complete_baseexception(self): # Python issue #22429: run_until_complete() must not schedule a pending @@ -846,23 +851,24 @@ def test_run_until_complete_baseexception(self): async def raise_keyboard_interrupt(): raise KeyboardInterrupt - self.loop._process_events = mock.Mock() + with self.assertWarns(DeprecationWarning): + self.loop._process_events = mock.Mock() - try: - self.loop.run_until_complete(raise_keyboard_interrupt()) - except KeyboardInterrupt: - pass + try: + self.loop.run_until_complete(raise_keyboard_interrupt()) + except KeyboardInterrupt: + pass - def func(): - self.loop.stop() - func.called = True - func.called = False - try: - self.loop.call_soon(func) - self.loop.run_forever() - except KeyboardInterrupt: - pass - self.assertTrue(func.called) + def func(): + self.loop.stop() + func.called = True + func.called = False + try: + self.loop.call_soon(func) + self.loop.run_forever() + except KeyboardInterrupt: + pass + self.assertTrue(func.called) def test_single_selecter_event_callback_after_stopping(self): # Python issue #25593: A stopped event loop may cause event callbacks @@ -953,37 +959,39 @@ async def iter_one(): def test_asyncgen_finalization_by_gc(self): # Async generators should be finalized when garbage collected. - self.loop._process_events = mock.Mock() - self.loop._write_to_self = mock.Mock() - with support.disable_gc(): - status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) - while not status['stopped']: + with self.assertWarns(DeprecationWarning): + self.loop._process_events = mock.Mock() + self.loop._write_to_self = mock.Mock() + with support.disable_gc(): + status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) + while not status['stopped']: + test_utils.run_briefly(self.loop) + self.assertTrue(status['started']) + self.assertTrue(status['stopped']) + self.assertFalse(status['finalized']) + support.gc_collect() test_utils.run_briefly(self.loop) - self.assertTrue(status['started']) - self.assertTrue(status['stopped']) - self.assertFalse(status['finalized']) - support.gc_collect() - test_utils.run_briefly(self.loop) - self.assertTrue(status['finalized']) + self.assertTrue(status['finalized']) def test_asyncgen_finalization_by_gc_in_other_thread(self): # Python issue 34769: If garbage collector runs in another # thread, async generators will not finalize in debug # mode. - self.loop._process_events = mock.Mock() - self.loop._write_to_self = mock.Mock() - self.loop.set_debug(True) - with support.disable_gc(): - status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) - while not status['stopped']: + with self.assertWarns(DeprecationWarning): + self.loop._process_events = mock.Mock() + self.loop._write_to_self = mock.Mock() + self.loop.set_debug(True) + with support.disable_gc(): + status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) + while not status['stopped']: + test_utils.run_briefly(self.loop) + self.assertTrue(status['started']) + self.assertTrue(status['stopped']) + self.assertFalse(status['finalized']) + self.loop.run_until_complete( + self.loop.run_in_executor(None, support.gc_collect)) test_utils.run_briefly(self.loop) - self.assertTrue(status['started']) - self.assertTrue(status['stopped']) - self.assertFalse(status['finalized']) - self.loop.run_until_complete( - self.loop.run_in_executor(None, support.gc_collect)) - test_utils.run_briefly(self.loop) - self.assertTrue(status['finalized']) + self.assertTrue(status['finalized']) class MyProto(asyncio.Protocol): @@ -1059,34 +1067,35 @@ def test_getnameinfo(self, m_gai): @patch_socket def test_create_connection_multiple_errors(self, m_socket): + with self.assertWarns(DeprecationWarning): + class MyProto(asyncio.Protocol): + pass - class MyProto(asyncio.Protocol): - pass + async def getaddrinfo(*args, **kw): + return [(2, 1, 6, '', ('107.6.106.82', 80)), + (2, 1, 6, '', ('107.6.106.82', 80))] - async def getaddrinfo(*args, **kw): - return [(2, 1, 6, '', ('107.6.106.82', 80)), - (2, 1, 6, '', ('107.6.106.82', 80))] + def getaddrinfo_task(*args, **kwds): + return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) - def getaddrinfo_task(*args, **kwds): - return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) + idx = -1 + errors = ['err1', 'err2'] - idx = -1 - errors = ['err1', 'err2'] + def _socket(*args, **kw): + nonlocal idx, errors + idx += 1 + raise OSError(errors[idx]) - def _socket(*args, **kw): - nonlocal idx, errors - idx += 1 - raise OSError(errors[idx]) + m_socket.socket = _socket - m_socket.socket = _socket - self.loop.getaddrinfo = getaddrinfo_task + self.loop.getaddrinfo = getaddrinfo_task - coro = self.loop.create_connection(MyProto, 'example.com', 80) - with self.assertRaises(OSError) as cm: - self.loop.run_until_complete(coro) + coro = self.loop.create_connection(MyProto, 'example.com', 80) + with self.assertRaises(OSError) as cm: + self.loop.run_until_complete(coro) - self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') + self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') @patch_socket def test_create_connection_timeout(self, m_socket): @@ -1100,27 +1109,32 @@ def getaddrinfo(*args, **kw): ('127.0.0.1', 80)) fut.set_result([addr]) return fut - self.loop.getaddrinfo = getaddrinfo + + with self.assertWarns(DeprecationWarning): + self.loop.getaddrinfo = getaddrinfo - with mock.patch.object(self.loop, 'sock_connect', - side_effect=asyncio.TimeoutError): - coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) - with self.assertRaises(asyncio.TimeoutError): - self.loop.run_until_complete(coro) - self.assertTrue(sock.close.called) + with mock.patch.object(self.loop, 'sock_connect', + side_effect=asyncio.TimeoutError): + coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) + with self.assertRaises(asyncio.TimeoutError): + self.loop.run_until_complete(coro) + self.assertTrue(sock.close.called) def test_create_connection_host_port_sock(self): - coro = self.loop.create_connection( - MyProto, 'example.com', 80, sock=object()) - self.assertRaises(ValueError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection( + MyProto, 'example.com', 80, sock=object()) + self.assertRaises(ValueError, self.loop.run_until_complete, coro) def test_create_connection_wrong_sock(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - with sock: - coro = self.loop.create_connection(MyProto, sock=sock) - with self.assertRaisesRegex(ValueError, - 'A Stream Socket was expected'): - self.loop.run_until_complete(coro) + + with self.assertWarns(DeprecationWarning): + with sock: + coro = self.loop.create_connection(MyProto, sock=sock) + with self.assertRaisesRegex(ValueError, + 'A Stream Socket was expected'): + self.loop.run_until_complete(coro) def test_create_server_wrong_sock(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -1179,8 +1193,9 @@ def test_create_datagram_endpoint_wrong_sock(self): self.loop.run_until_complete(coro) def test_create_connection_no_host_port_sock(self): - coro = self.loop.create_connection(MyProto) - self.assertRaises(ValueError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection(MyProto) + self.assertRaises(ValueError, self.loop.run_until_complete, coro) def test_create_connection_no_getaddrinfo(self): async def getaddrinfo(*args, **kw): @@ -1190,9 +1205,10 @@ def getaddrinfo_task(*args, **kwds): return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) self.loop.getaddrinfo = getaddrinfo_task - coro = self.loop.create_connection(MyProto, 'example.com', 80) - self.assertRaises( - OSError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection(MyProto, 'example.com', 80) + self.assertRaises( + OSError, self.loop.run_until_complete, coro) def test_create_connection_connect_err(self): async def getaddrinfo(*args, **kw): @@ -1201,13 +1217,15 @@ async def getaddrinfo(*args, **kw): def getaddrinfo_task(*args, **kwds): return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) - self.loop.getaddrinfo = getaddrinfo_task - self.loop.sock_connect = mock.Mock() - self.loop.sock_connect.side_effect = OSError + + with self.assertWarns(DeprecationWarning): + self.loop.getaddrinfo = getaddrinfo_task + self.loop.sock_connect = mock.Mock() + self.loop.sock_connect.side_effect = OSError - coro = self.loop.create_connection(MyProto, 'example.com', 80) - self.assertRaises( - OSError, self.loop.run_until_complete, coro) + coro = self.loop.create_connection(MyProto, 'example.com', 80) + self.assertRaises( + OSError, self.loop.run_until_complete, coro) def test_create_connection_multiple(self): async def getaddrinfo(*args, **kw): @@ -1217,14 +1235,15 @@ async def getaddrinfo(*args, **kw): def getaddrinfo_task(*args, **kwds): return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) - self.loop.getaddrinfo = getaddrinfo_task - self.loop.sock_connect = mock.Mock() - self.loop.sock_connect.side_effect = OSError + with self.assertWarns(DeprecationWarning): + self.loop.getaddrinfo = getaddrinfo_task + self.loop.sock_connect = mock.Mock() + self.loop.sock_connect.side_effect = OSError - coro = self.loop.create_connection( - MyProto, 'example.com', 80, family=socket.AF_INET) - with self.assertRaises(OSError): - self.loop.run_until_complete(coro) + coro = self.loop.create_connection( + MyProto, 'example.com', 80, family=socket.AF_INET) + with self.assertRaises(OSError): + self.loop.run_until_complete(coro) @patch_socket def test_create_connection_multiple_errors_local_addr(self, m_socket): @@ -1244,18 +1263,19 @@ async def getaddrinfo(*args, **kw): def getaddrinfo_task(*args, **kwds): return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) - self.loop.getaddrinfo = getaddrinfo_task - self.loop.sock_connect = mock.Mock() - self.loop.sock_connect.side_effect = OSError('Err2') - - coro = self.loop.create_connection( - MyProto, 'example.com', 80, family=socket.AF_INET, - local_addr=(None, 8080)) - with self.assertRaises(OSError) as cm: - self.loop.run_until_complete(coro) + with self.assertWarns(DeprecationWarning): + self.loop.getaddrinfo = getaddrinfo_task + self.loop.sock_connect = mock.Mock() + self.loop.sock_connect.side_effect = OSError('Err2') + + coro = self.loop.create_connection( + MyProto, 'example.com', 80, family=socket.AF_INET, + local_addr=(None, 8080)) + with self.assertRaises(OSError) as cm: + self.loop.run_until_complete(coro) - self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) - self.assertTrue(m_socket.socket.return_value.close.called) + self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) + self.assertTrue(m_socket.socket.return_value.close.called) def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): # Test the fallback code, even if this system has inet_pton. @@ -1265,38 +1285,39 @@ def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): m_socket.getaddrinfo = socket.getaddrinfo sock = m_socket.socket.return_value - self.loop._add_reader = mock.Mock() - self.loop._add_reader._is_coroutine = False - self.loop._add_writer = mock.Mock() - self.loop._add_writer._is_coroutine = False + with self.assertWarns(DeprecationWarning): + self.loop._add_reader = mock.Mock() + self.loop._add_reader._is_coroutine = False + self.loop._add_writer = mock.Mock() + self.loop._add_writer._is_coroutine = False - coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) - t, p = self.loop.run_until_complete(coro) - try: - sock.connect.assert_called_with(('1.2.3.4', 80)) - _, kwargs = m_socket.socket.call_args - self.assertEqual(kwargs['family'], m_socket.AF_INET) - self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) - finally: - t.close() - test_utils.run_briefly(self.loop) # allow transport to close + coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) + t, p = self.loop.run_until_complete(coro) + try: + sock.connect.assert_called_with(('1.2.3.4', 80)) + _, kwargs = m_socket.socket.call_args + self.assertEqual(kwargs['family'], m_socket.AF_INET) + self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) + finally: + t.close() + test_utils.run_briefly(self.loop) # allow transport to close - sock.family = socket.AF_INET6 - coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) - t, p = self.loop.run_until_complete(coro) - try: - # Without inet_pton we use getaddrinfo, which transforms ('::1', 80) - # to ('::1', 80, 0, 0). The last 0s are flow info, scope id. - [address] = sock.connect.call_args[0] - host, port = address[:2] - self.assertRegex(host, r'::(0\.)*1') - self.assertEqual(port, 80) - _, kwargs = m_socket.socket.call_args - self.assertEqual(kwargs['family'], m_socket.AF_INET6) - self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) - finally: - t.close() - test_utils.run_briefly(self.loop) # allow transport to close + sock.family = socket.AF_INET6 + coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) + t, p = self.loop.run_until_complete(coro) + try: + # Without inet_pton we use getaddrinfo, which transforms ('::1', 80) + # to ('::1', 80, 0, 0). The last 0s are flow info, scope id. + [address] = sock.connect.call_args[0] + host, port = address[:2] + self.assertRegex(host, r'::(0\.)*1') + self.assertEqual(port, 80) + _, kwargs = m_socket.socket.call_args + self.assertEqual(kwargs['family'], m_socket.AF_INET6) + self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) + finally: + t.close() + test_utils.run_briefly(self.loop) # allow transport to close @patch_socket def test_create_connection_ipv6_scope(self, m_socket): @@ -1309,16 +1330,18 @@ def test_create_connection_ipv6_scope(self, m_socket): self.loop._add_writer = mock.Mock() self.loop._add_writer._is_coroutine = False - coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80) - t, p = self.loop.run_until_complete(coro) - try: - sock.connect.assert_called_with(('fe80::1', 80, 0, 1)) - _, kwargs = m_socket.socket.call_args - self.assertEqual(kwargs['family'], m_socket.AF_INET6) - self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) - finally: - t.close() - test_utils.run_briefly(self.loop) # allow transport to close + + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80) + t, p = self.loop.run_until_complete(coro) + try: + sock.connect.assert_called_with(('fe80::1', 80, 0, 1)) + _, kwargs = m_socket.socket.call_args + self.assertEqual(kwargs['family'], m_socket.AF_INET6) + self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) + finally: + t.close() + test_utils.run_briefly(self.loop) # allow transport to close @patch_socket def test_create_connection_ip_addr(self, m_socket): @@ -1337,27 +1360,28 @@ def test_create_connection_service_name(self, m_socket): self.loop._add_reader._is_coroutine = False self.loop._add_writer = mock.Mock() self.loop._add_writer._is_coroutine = False - - for service, port in ('http', 80), (b'http', 80): - coro = self.loop.create_connection(asyncio.Protocol, - '127.0.0.1', service) - - t, p = self.loop.run_until_complete(coro) - try: - sock.connect.assert_called_with(('127.0.0.1', port)) - _, kwargs = m_socket.socket.call_args - self.assertEqual(kwargs['family'], m_socket.AF_INET) - self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) - finally: - t.close() - test_utils.run_briefly(self.loop) # allow transport to close - - for service in 'nonsense', b'nonsense': - coro = self.loop.create_connection(asyncio.Protocol, - '127.0.0.1', service) - - with self.assertRaises(OSError): - self.loop.run_until_complete(coro) + + with self.assertWarns(DeprecationWarning): + for service, port in ('http', 80), (b'http', 80): + coro = self.loop.create_connection(asyncio.Protocol, + '127.0.0.1', service) + + t, p = self.loop.run_until_complete(coro) + try: + sock.connect.assert_called_with(('127.0.0.1', port)) + _, kwargs = m_socket.socket.call_args + self.assertEqual(kwargs['family'], m_socket.AF_INET) + self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) + finally: + t.close() + test_utils.run_briefly(self.loop) # allow transport to close + + for service in 'nonsense', b'nonsense': + coro = self.loop.create_connection(asyncio.Protocol, + '127.0.0.1', service) + + with self.assertRaises(OSError): + self.loop.run_until_complete(coro) def test_create_connection_no_local_addr(self): async def getaddrinfo(host, *args, **kw): @@ -1371,11 +1395,12 @@ def getaddrinfo_task(*args, **kwds): return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop) self.loop.getaddrinfo = getaddrinfo_task - coro = self.loop.create_connection( - MyProto, 'example.com', 80, family=socket.AF_INET, - local_addr=(None, 8080)) - self.assertRaises( - OSError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection( + MyProto, 'example.com', 80, family=socket.AF_INET, + local_addr=(None, 8080)) + self.assertRaises( + OSError, self.loop.run_until_complete, coro) @patch_socket def test_create_connection_bluetooth(self, m_socket): @@ -1389,8 +1414,9 @@ def getaddrinfo(host, port, *args, **kw): m_socket.getaddrinfo = getaddrinfo sock = m_socket.socket() - coro = self.loop.sock_connect(sock, addr) - self.loop.run_until_complete(coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.sock_connect(sock, addr) + self.loop.run_until_complete(coro) def test_create_connection_ssl_server_hostname_default(self): self.loop.getaddrinfo = mock.Mock() @@ -1428,71 +1454,75 @@ def mock_make_ssl_transport(sock, protocol, sslcontext, waiter, handshake_timeout = object() # First try the default server_hostname. self.loop._make_ssl_transport.reset_mock() - coro = self.loop.create_connection( - MyProto, 'python.org', 80, ssl=True, - ssl_handshake_timeout=handshake_timeout) - transport, _ = self.loop.run_until_complete(coro) - transport.close() - self.loop._make_ssl_transport.assert_called_with( - ANY, ANY, ANY, ANY, - server_side=False, - server_hostname='python.org', - ssl_handshake_timeout=handshake_timeout) - # Next try an explicit server_hostname. - self.loop._make_ssl_transport.reset_mock() - coro = self.loop.create_connection( - MyProto, 'python.org', 80, ssl=True, - server_hostname='perl.com', - ssl_handshake_timeout=handshake_timeout) - transport, _ = self.loop.run_until_complete(coro) - transport.close() - self.loop._make_ssl_transport.assert_called_with( - ANY, ANY, ANY, ANY, - server_side=False, - server_hostname='perl.com', - ssl_handshake_timeout=handshake_timeout) - # Finally try an explicit empty server_hostname. - self.loop._make_ssl_transport.reset_mock() - coro = self.loop.create_connection( - MyProto, 'python.org', 80, ssl=True, - server_hostname='', + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection( + MyProto, 'python.org', 80, ssl=True, + ssl_handshake_timeout=handshake_timeout) + transport, _ = self.loop.run_until_complete(coro) + transport.close() + self.loop._make_ssl_transport.assert_called_with( + ANY, ANY, ANY, ANY, + server_side=False, + server_hostname='python.org', ssl_handshake_timeout=handshake_timeout) - transport, _ = self.loop.run_until_complete(coro) - transport.close() - self.loop._make_ssl_transport.assert_called_with( + # Next try an explicit server_hostname. + self.loop._make_ssl_transport.reset_mock() + coro = self.loop.create_connection( + MyProto, 'python.org', 80, ssl=True, + server_hostname='perl.com', + ssl_handshake_timeout=handshake_timeout) + transport, _ = self.loop.run_until_complete(coro) + transport.close() + self.loop._make_ssl_transport.assert_called_with( ANY, ANY, ANY, ANY, server_side=False, - server_hostname='', + server_hostname='perl.com', ssl_handshake_timeout=handshake_timeout) + # Finally try an explicit empty server_hostname. + self.loop._make_ssl_transport.reset_mock() + coro = self.loop.create_connection( + MyProto, 'python.org', 80, ssl=True, + server_hostname='', + ssl_handshake_timeout=handshake_timeout) + transport, _ = self.loop.run_until_complete(coro) + transport.close() + self.loop._make_ssl_transport.assert_called_with( + ANY, ANY, ANY, ANY, + server_side=False, + server_hostname='', + ssl_handshake_timeout=handshake_timeout) def test_create_connection_no_ssl_server_hostname_errors(self): # When not using ssl, server_hostname must be None. - coro = self.loop.create_connection(MyProto, 'python.org', 80, - server_hostname='') - self.assertRaises(ValueError, self.loop.run_until_complete, coro) - coro = self.loop.create_connection(MyProto, 'python.org', 80, - server_hostname='python.org') - self.assertRaises(ValueError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection(MyProto, 'python.org', 80, + server_hostname='') + self.assertRaises(ValueError, self.loop.run_until_complete, coro) + coro = self.loop.create_connection(MyProto, 'python.org', 80, + server_hostname='python.org') + self.assertRaises(ValueError, self.loop.run_until_complete, coro) def test_create_connection_ssl_server_hostname_errors(self): # When using ssl, server_hostname may be None if host is non-empty. - coro = self.loop.create_connection(MyProto, '', 80, ssl=True) - self.assertRaises(ValueError, self.loop.run_until_complete, coro) - coro = self.loop.create_connection(MyProto, None, 80, ssl=True) - self.assertRaises(ValueError, self.loop.run_until_complete, coro) - sock = socket.socket() - coro = self.loop.create_connection(MyProto, None, None, - ssl=True, sock=sock) - self.addCleanup(sock.close) - self.assertRaises(ValueError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection(MyProto, '', 80, ssl=True) + self.assertRaises(ValueError, self.loop.run_until_complete, coro) + coro = self.loop.create_connection(MyProto, None, 80, ssl=True) + self.assertRaises(ValueError, self.loop.run_until_complete, coro) + sock = socket.socket() + coro = self.loop.create_connection(MyProto, None, None, + ssl=True, sock=sock) + self.addCleanup(sock.close) + self.assertRaises(ValueError, self.loop.run_until_complete, coro) def test_create_connection_ssl_timeout_for_plain_socket(self): - coro = self.loop.create_connection( - MyProto, 'example.com', 80, ssl_handshake_timeout=1) - with self.assertRaisesRegex( - ValueError, - 'ssl_handshake_timeout is only meaningful with ssl'): - self.loop.run_until_complete(coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_connection( + MyProto, 'example.com', 80, ssl_handshake_timeout=1) + with self.assertRaisesRegex( + ValueError, + 'ssl_handshake_timeout is only meaningful with ssl'): + self.loop.run_until_complete(coro) def test_create_server_empty_host(self): # if host is empty string use None instead @@ -1571,46 +1601,50 @@ def test_create_datagram_endpoint_no_addrinfo(self, m_socket): m_socket.getaddrinfo.return_value = [] m_socket.getaddrinfo._is_coroutine = False - coro = self.loop.create_datagram_endpoint( - MyDatagramProto, local_addr=('localhost', 0)) - self.assertRaises( - OSError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_datagram_endpoint( + MyDatagramProto, local_addr=('localhost', 0)) + self.assertRaises( + OSError, self.loop.run_until_complete, coro) def test_create_datagram_endpoint_addr_error(self): - coro = self.loop.create_datagram_endpoint( - MyDatagramProto, local_addr='localhost') - self.assertRaises( - AssertionError, self.loop.run_until_complete, coro) - coro = self.loop.create_datagram_endpoint( - MyDatagramProto, local_addr=('localhost', 1, 2, 3)) - self.assertRaises( - AssertionError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_datagram_endpoint( + MyDatagramProto, local_addr='localhost') + self.assertRaises( + AssertionError, self.loop.run_until_complete, coro) + coro = self.loop.create_datagram_endpoint( + MyDatagramProto, local_addr=('localhost', 1, 2, 3)) + self.assertRaises( + AssertionError, self.loop.run_until_complete, coro) def test_create_datagram_endpoint_connect_err(self): - self.loop.sock_connect = mock.Mock() - self.loop.sock_connect.side_effect = OSError + with self.assertWarns(DeprecationWarning): + self.loop.sock_connect = mock.Mock() + self.loop.sock_connect.side_effect = OSError - coro = self.loop.create_datagram_endpoint( - asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) - self.assertRaises( - OSError, self.loop.run_until_complete, coro) + coro = self.loop.create_datagram_endpoint( + asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) + self.assertRaises( + OSError, self.loop.run_until_complete, coro) def test_create_datagram_endpoint_allow_broadcast(self): - protocol = MyDatagramProto(create_future=True, loop=self.loop) - self.loop.sock_connect = sock_connect = mock.Mock() - sock_connect.return_value = [] + with self.assertWarns(DeprecationWarning): + protocol = MyDatagramProto(create_future=True, loop=self.loop) + self.loop.sock_connect = sock_connect = mock.Mock() + sock_connect.return_value = [] - coro = self.loop.create_datagram_endpoint( - lambda: protocol, - remote_addr=('127.0.0.1', 0), - allow_broadcast=True) + coro = self.loop.create_datagram_endpoint( + lambda: protocol, + remote_addr=('127.0.0.1', 0), + allow_broadcast=True) - transport, _ = self.loop.run_until_complete(coro) - self.assertFalse(sock_connect.called) + transport, _ = self.loop.run_until_complete(coro) + self.assertFalse(sock_connect.called) - transport.close() - self.loop.run_until_complete(protocol.done) - self.assertEqual('CLOSED', protocol.state) + transport.close() + self.loop.run_until_complete(protocol.done) + self.assertEqual('CLOSED', protocol.state) @patch_socket def test_create_datagram_endpoint_socket_err(self, m_socket): @@ -1629,27 +1663,30 @@ def test_create_datagram_endpoint_socket_err(self, m_socket): @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled') def test_create_datagram_endpoint_no_matching_family(self): - coro = self.loop.create_datagram_endpoint( - asyncio.DatagramProtocol, - remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) - self.assertRaises( - ValueError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_datagram_endpoint( + asyncio.DatagramProtocol, + remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) + self.assertRaises( + ValueError, self.loop.run_until_complete, coro) @patch_socket def test_create_datagram_endpoint_setblk_err(self, m_socket): m_socket.socket.return_value.setblocking.side_effect = OSError - coro = self.loop.create_datagram_endpoint( - asyncio.DatagramProtocol, family=socket.AF_INET) - self.assertRaises( - OSError, self.loop.run_until_complete, coro) - self.assertTrue( - m_socket.socket.return_value.close.called) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_datagram_endpoint( + asyncio.DatagramProtocol, family=socket.AF_INET) + self.assertRaises( + OSError, self.loop.run_until_complete, coro) + self.assertTrue( + m_socket.socket.return_value.close.called) def test_create_datagram_endpoint_noaddr_nofamily(self): - coro = self.loop.create_datagram_endpoint( - asyncio.DatagramProtocol) - self.assertRaises(ValueError, self.loop.run_until_complete, coro) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_datagram_endpoint( + asyncio.DatagramProtocol) + self.assertRaises(ValueError, self.loop.run_until_complete, coro) @patch_socket def test_create_datagram_endpoint_cant_bind(self, m_socket): @@ -1660,83 +1697,88 @@ class Err(OSError): m_sock = m_socket.socket.return_value = mock.Mock() m_sock.bind.side_effect = Err - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, - local_addr=('127.0.0.1', 0), family=socket.AF_INET) - self.assertRaises(Err, self.loop.run_until_complete, fut) - self.assertTrue(m_sock.close.called) + with self.assertWarns(DeprecationWarning): + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, + local_addr=('127.0.0.1', 0), family=socket.AF_INET) + self.assertRaises(Err, self.loop.run_until_complete, fut) + self.assertTrue(m_sock.close.called) def test_create_datagram_endpoint_sock(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('127.0.0.1', 0)) - fut = self.loop.create_datagram_endpoint( - lambda: MyDatagramProto(create_future=True, loop=self.loop), - sock=sock) - transport, protocol = self.loop.run_until_complete(fut) - transport.close() - self.loop.run_until_complete(protocol.done) - self.assertEqual('CLOSED', protocol.state) + with self.assertWarns(DeprecationWarning): + fut = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(create_future=True, loop=self.loop), + sock=sock) + transport, protocol = self.loop.run_until_complete(fut) + transport.close() + self.loop.run_until_complete(protocol.done) + self.assertEqual('CLOSED', protocol.state) @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') def test_create_datagram_endpoint_sock_unix(self): - fut = self.loop.create_datagram_endpoint( - lambda: MyDatagramProto(create_future=True, loop=self.loop), - family=socket.AF_UNIX) - transport, protocol = self.loop.run_until_complete(fut) - assert transport._sock.family == socket.AF_UNIX - transport.close() - self.loop.run_until_complete(protocol.done) - self.assertEqual('CLOSED', protocol.state) - - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') - def test_create_datagram_endpoint_existing_sock_unix(self): - with test_utils.unix_socket_path() as path: - sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) - sock.bind(path) - sock.close() - - coro = self.loop.create_datagram_endpoint( + with self.assertWarns(DeprecationWarning): + fut = self.loop.create_datagram_endpoint( lambda: MyDatagramProto(create_future=True, loop=self.loop), - path, family=socket.AF_UNIX) - transport, protocol = self.loop.run_until_complete(coro) + family=socket.AF_UNIX) + transport, protocol = self.loop.run_until_complete(fut) + assert transport._sock.family == socket.AF_UNIX transport.close() self.loop.run_until_complete(protocol.done) + self.assertEqual('CLOSED', protocol.state) + + @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') + def test_create_datagram_endpoint_existing_sock_unix(self): + with self.assertWarns(DeprecationWarning): + with test_utils.unix_socket_path() as path: + sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) + sock.bind(path) + sock.close() + + coro = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(create_future=True, loop=self.loop), + path, family=socket.AF_UNIX) + transport, protocol = self.loop.run_until_complete(coro) + transport.close() + self.loop.run_until_complete(protocol.done) def test_create_datagram_endpoint_sock_sockopts(self): class FakeSock: type = socket.SOCK_DGRAM - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) - self.assertRaises(ValueError, self.loop.run_until_complete, fut) + with self.assertWarns(DeprecationWarning): + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) - self.assertRaises(ValueError, self.loop.run_until_complete, fut) + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, family=1, sock=FakeSock()) - self.assertRaises(ValueError, self.loop.run_until_complete, fut) + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, family=1, sock=FakeSock()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, proto=1, sock=FakeSock()) - self.assertRaises(ValueError, self.loop.run_until_complete, fut) + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, proto=1, sock=FakeSock()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, flags=1, sock=FakeSock()) - self.assertRaises(ValueError, self.loop.run_until_complete, fut) + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, flags=1, sock=FakeSock()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, reuse_address=True, sock=FakeSock()) - self.assertRaises(ValueError, self.loop.run_until_complete, fut) + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, reuse_address=True, sock=FakeSock()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, reuse_port=True, sock=FakeSock()) - self.assertRaises(ValueError, self.loop.run_until_complete, fut) + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, reuse_port=True, sock=FakeSock()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) - fut = self.loop.create_datagram_endpoint( - MyDatagramProto, allow_broadcast=True, sock=FakeSock()) - self.assertRaises(ValueError, self.loop.run_until_complete, fut) + fut = self.loop.create_datagram_endpoint( + MyDatagramProto, allow_broadcast=True, sock=FakeSock()) + self.assertRaises(ValueError, self.loop.run_until_complete, fut) def test_create_datagram_endpoint_sockopts(self): # Socket options should not be applied unless asked for. @@ -1802,40 +1844,42 @@ def test_create_datagram_endpoint_nosoreuseport(self, m_socket): del m_socket.SO_REUSEPORT m_socket.socket.return_value = mock.Mock() - coro = self.loop.create_datagram_endpoint( - lambda: MyDatagramProto(loop=self.loop), - local_addr=('127.0.0.1', 0), - reuse_address=False, - reuse_port=True) + with self.assertWarns(DeprecationWarning): + coro = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(loop=self.loop), + local_addr=('127.0.0.1', 0), + reuse_address=False, + reuse_port=True) - self.assertRaises(ValueError, self.loop.run_until_complete, coro) + self.assertRaises(ValueError, self.loop.run_until_complete, coro) @patch_socket def test_create_datagram_endpoint_ip_addr(self, m_socket): def getaddrinfo(*args, **kw): self.fail('should not have called getaddrinfo') - m_socket.getaddrinfo = getaddrinfo - m_socket.socket.return_value.bind = bind = mock.Mock() - self.loop._add_reader = mock.Mock() - self.loop._add_reader._is_coroutine = False + with self.assertWarns(DeprecationWarning): + m_socket.getaddrinfo = getaddrinfo + m_socket.socket.return_value.bind = bind = mock.Mock() + self.loop._add_reader = mock.Mock() + self.loop._add_reader._is_coroutine = False - reuseport_supported = hasattr(socket, 'SO_REUSEPORT') - coro = self.loop.create_datagram_endpoint( - lambda: MyDatagramProto(loop=self.loop), - local_addr=('1.2.3.4', 0), - reuse_address=False, - reuse_port=reuseport_supported) + reuseport_supported = hasattr(socket, 'SO_REUSEPORT') + coro = self.loop.create_datagram_endpoint( + lambda: MyDatagramProto(loop=self.loop), + local_addr=('1.2.3.4', 0), + reuse_address=False, + reuse_port=reuseport_supported) - t, p = self.loop.run_until_complete(coro) - try: - bind.assert_called_with(('1.2.3.4', 0)) - m_socket.socket.assert_called_with(family=m_socket.AF_INET, - proto=m_socket.IPPROTO_UDP, - type=m_socket.SOCK_DGRAM) - finally: - t.close() - test_utils.run_briefly(self.loop) # allow transport to close + t, p = self.loop.run_until_complete(coro) + try: + bind.assert_called_with(('1.2.3.4', 0)) + m_socket.socket.assert_called_with(family=m_socket.AF_INET, + proto=m_socket.IPPROTO_UDP, + type=m_socket.SOCK_DGRAM) + finally: + t.close() + test_utils.run_briefly(self.loop) # allow transport to close def test_accept_connection_retry(self): sock = mock.Mock()