From 51b80c72251052ca49588c4bca6c1f5b7c0035c7 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 18 Sep 2023 16:05:17 -0600 Subject: [PATCH 01/11] Factor out PyThread_ParseTimeoutArg(). --- Include/internal/pycore_pythread.h | 6 ++++++ Modules/_queuemodule.c | 2 ++ Modules/_threadmodule.c | 11 +++++----- Python/thread.c | 34 ++++++++++++++++++++++++++++++ 4 files changed, 48 insertions(+), 5 deletions(-) diff --git a/Include/internal/pycore_pythread.h b/Include/internal/pycore_pythread.h index ffd7398eaeee5a..d31ffc78130534 100644 --- a/Include/internal/pycore_pythread.h +++ b/Include/internal/pycore_pythread.h @@ -89,6 +89,12 @@ extern int _PyThread_at_fork_reinit(PyThread_type_lock *lock); // unset: -1 seconds, in nanoseconds #define PyThread_UNSET_TIMEOUT ((_PyTime_t)(-1 * 1000 * 1000 * 1000)) +// Exported for the _xxinterpchannels module. +PyAPI_FUNC(int) PyThread_ParseTimeoutArg( + PyObject *arg, + int blocking, + PY_TIMEOUT_T *timeout); + /* Helper to acquire an interruptible lock with a timeout. If the lock acquire * is interrupted, signal handlers are run, and if they raise an exception, * PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c index b4bafb375c999d..81a06cdb79a4f2 100644 --- a/Modules/_queuemodule.c +++ b/Modules/_queuemodule.c @@ -214,6 +214,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls, PY_TIMEOUT_T microseconds; PyThreadState *tstate = PyThreadState_Get(); + // XXX Use PyThread_ParseTimeoutArg(). + if (block == 0) { /* Non-blocking */ microseconds = 0; diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 7620511dd1d6eb..4d453040503643 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -88,14 +88,15 @@ lock_acquire_parse_args(PyObject *args, PyObject *kwds, char *kwlist[] = {"blocking", "timeout", NULL}; int blocking = 1; PyObject *timeout_obj = NULL; - const _PyTime_t unset_timeout = _PyTime_FromSeconds(-1); - - *timeout = unset_timeout ; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "|pO:acquire", kwlist, &blocking, &timeout_obj)) return -1; + // XXX Use PyThread_ParseTimeoutArg(). + + const _PyTime_t unset_timeout = _PyTime_FromSeconds(-1); + *timeout = unset_timeout; + if (timeout_obj && _PyTime_FromSecondsObject(timeout, timeout_obj, _PyTime_ROUND_TIMEOUT) < 0) @@ -108,7 +109,7 @@ lock_acquire_parse_args(PyObject *args, PyObject *kwds, } if (*timeout < 0 && *timeout != unset_timeout) { PyErr_SetString(PyExc_ValueError, - "timeout value must be positive"); + "timeout value must be a non-negative number"); return -1; } if (!blocking) diff --git a/Python/thread.c b/Python/thread.c index 7185dd43d965b9..fefae8391617f7 100644 --- a/Python/thread.c +++ b/Python/thread.c @@ -93,6 +93,40 @@ PyThread_set_stacksize(size_t size) } +int +PyThread_ParseTimeoutArg(PyObject *arg, int blocking, PY_TIMEOUT_T *timeout_p) +{ + assert(_PyTime_FromSeconds(-1) == PyThread_UNSET_TIMEOUT); + if (arg == NULL || arg == Py_None) { + *timeout_p = blocking ? PyThread_UNSET_TIMEOUT : 0; + return 0; + } + if (!blocking) { + PyErr_SetString(PyExc_ValueError, + "can't specify a timeout for a non-blocking call"); + return -1; + } + + _PyTime_t timeout; + if (_PyTime_FromSecondsObject(&timeout, arg, _PyTime_ROUND_TIMEOUT) < 0) { + return -1; + } + if (timeout < 0) { + PyErr_SetString(PyExc_ValueError, + "timeout value must be a non-negative number"); + return -1; + } + + if (_PyTime_AsMicroseconds(timeout, + _PyTime_ROUND_TIMEOUT) > PY_TIMEOUT_MAX) { + PyErr_SetString(PyExc_OverflowError, + "timeout value is too large"); + return -1; + } + *timeout_p = timeout; + return 0; +} + PyLockStatus PyThread_acquire_lock_timed_with_retries(PyThread_type_lock lock, PY_TIMEOUT_T timeout) From 4dcf0271b8cb5e5be33d968c0186955619141f21 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 18 Sep 2023 13:57:43 -0600 Subject: [PATCH 02/11] Pass a timeout to wait_for_lock(). --- Modules/_xxinterpchannelsmodule.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index be53cbfc39b4dd..bc1ccc901d5917 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -242,9 +242,8 @@ add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared, } static int -wait_for_lock(PyThread_type_lock mutex) +wait_for_lock(PyThread_type_lock mutex, PY_TIMEOUT_T timeout) { - PY_TIMEOUT_T timeout = PyThread_UNSET_TIMEOUT; PyLockStatus res = PyThread_acquire_lock_timed_with_retries(mutex, timeout); if (res == PY_LOCK_INTR) { /* KeyboardInterrupt, etc. */ @@ -1883,7 +1882,8 @@ _channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting) } static int -_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj) +_channel_send_wait(_channels *channels, int64_t cid, PyObject *obj, + PY_TIMEOUT_T timeout) { // We use a stack variable here, so we must ensure that &waiting // is not held by any channel item at the point this function exits. @@ -1901,7 +1901,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj) } /* Wait until the object is received. */ - if (wait_for_lock(waiting.mutex) < 0) { + if (wait_for_lock(waiting.mutex, timeout) < 0) { assert(PyErr_Occurred()); _waiting_finish_releasing(&waiting); /* The send() call is failing now, so make sure the item @@ -2830,11 +2830,12 @@ channel_send(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } cid = cid_data.cid; + PY_TIMEOUT_T timeout = PyThread_UNSET_TIMEOUT; /* Queue up the object. */ int err = 0; if (blocking) { - err = _channel_send_wait(&_globals.channels, cid, obj); + err = _channel_send_wait(&_globals.channels, cid, obj, timeout); } else { err = _channel_send(&_globals.channels, cid, obj, NULL); @@ -2869,6 +2870,7 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) return NULL; } cid = cid_data.cid; + PY_TIMEOUT_T timeout = PyThread_UNSET_TIMEOUT; PyObject *tempobj = PyMemoryView_FromObject(obj); if (tempobj == NULL) { @@ -2878,7 +2880,7 @@ channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) /* Queue up the object. */ int err = 0; if (blocking) { - err = _channel_send_wait(&_globals.channels, cid, tempobj); + err = _channel_send_wait(&_globals.channels, cid, tempobj, timeout); } else { err = _channel_send(&_globals.channels, cid, tempobj, NULL); From 21d85dd7af6908b65e909a1c0afd371d8d1c4da8 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 18 Sep 2023 16:12:12 -0600 Subject: [PATCH 03/11] Support a timeout arg. --- Modules/_xxinterpchannelsmodule.c | 33 +++++++++++++++++++------------ 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/Modules/_xxinterpchannelsmodule.c b/Modules/_xxinterpchannelsmodule.c index bc1ccc901d5917..2e2878d5c205cf 100644 --- a/Modules/_xxinterpchannelsmodule.c +++ b/Modules/_xxinterpchannelsmodule.c @@ -2816,21 +2816,24 @@ receive end."); static PyObject * channel_send(PyObject *self, PyObject *args, PyObject *kwds) { - // XXX Add a timeout arg. - static char *kwlist[] = {"cid", "obj", "blocking", NULL}; - int64_t cid; + static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; int blocking = 1; - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$p:channel_send", kwlist, + PyObject *timeout_obj = NULL; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist, channel_id_converter, &cid_data, &obj, - &blocking)) { + &blocking, &timeout_obj)) { + return NULL; + } + + int64_t cid = cid_data.cid; + PY_TIMEOUT_T timeout; + if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) { return NULL; } - cid = cid_data.cid; - PY_TIMEOUT_T timeout = PyThread_UNSET_TIMEOUT; /* Queue up the object. */ int err = 0; @@ -2856,21 +2859,25 @@ By default this waits for the object to be received."); static PyObject * channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"cid", "obj", "blocking", NULL}; - int64_t cid; + static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL}; struct channel_id_converter_data cid_data = { .module = self, }; PyObject *obj; int blocking = 1; + PyObject *timeout_obj = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O&O|$p:channel_send_buffer", kwlist, + "O&O|$pO:channel_send_buffer", kwlist, channel_id_converter, &cid_data, &obj, - &blocking)) { + &blocking, &timeout_obj)) { + return NULL; + } + + int64_t cid = cid_data.cid; + PY_TIMEOUT_T timeout; + if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) { return NULL; } - cid = cid_data.cid; - PY_TIMEOUT_T timeout = PyThread_UNSET_TIMEOUT; PyObject *tempobj = PyMemoryView_FromObject(obj); if (tempobj == NULL) { From f6df5ea4ee55954457245c9b31d375fd0317a226 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 18 Sep 2023 17:02:28 -0600 Subject: [PATCH 04/11] Add a timeout arg to SendChannel.send(). --- Lib/test/support/interpreters.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index 9ba6862a9ee01a..2b503373ef181d 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -203,12 +203,12 @@ class SendChannel(_ChannelEnd): _end = 'send' - def send(self, obj): + def send(self, obj, timeout=None): """Send the object (i.e. its data) to the channel's receiving end. This blocks until the object is received. """ - _channels.send(self._id, obj, blocking=True) + _channels.send(self._id, obj, timeout=timeout) def send_nowait(self, obj): """Send the object to the channel's receiving end. From 1b1f9160f1665343b3229d24089ef55f6288abdb Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 18 Sep 2023 17:09:16 -0600 Subject: [PATCH 05/11] Add a timeout to RecvChannel.recv(). --- Lib/test/support/interpreters.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index 2b503373ef181d..2474744ac3e6ab 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -170,15 +170,25 @@ class RecvChannel(_ChannelEnd): _end = 'recv' - def recv(self, *, _sentinel=object(), _delay=10 / 1000): # 10 milliseconds + def recv(self, timeout=None, *, + _sentinel=object(), + _delay=10 / 1000, # 10 milliseconds + ): """Return the next object from the channel. This blocks until an object has been sent, if none have been sent already. """ + if timeout is not None: + timeout = int(timeout) + if timeout < 0: + raise ValueError(f'timeout value must be non-negative') + end = time.now() + timeout obj = _channels.recv(self._id, _sentinel) while obj is _sentinel: time.sleep(_delay) + if timeout is not None and time.now() >= end: + raise TimeoutError obj = _channels.recv(self._id, _sentinel) return obj From 9131e5dc3f370aadda60fe3e4df524691bca5bfb Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 10 Oct 2023 02:58:31 -0600 Subject: [PATCH 06/11] Fix args. --- Lib/test/support/interpreters.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index 2474744ac3e6ab..0f89cf53a40d20 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -218,7 +218,7 @@ def send(self, obj, timeout=None): This blocks until the object is received. """ - _channels.send(self._id, obj, timeout=timeout) + _channels.send(self._id, obj, timeout=timeout, blocking=True) def send_nowait(self, obj): """Send the object to the channel's receiving end. @@ -231,12 +231,12 @@ def send_nowait(self, obj): # See bpo-32604 and gh-19829. return _channels.send(self._id, obj, blocking=False) - def send_buffer(self, obj): + def send_buffer(self, obj, timeout=None): """Send the object's buffer to the channel's receiving end. This blocks until the object is received. """ - _channels.send_buffer(self._id, obj, blocking=True) + _channels.send_buffer(self._id, obj, timeout=timeout, blocking=True) def send_buffer_nowait(self, obj): """Send the object's buffer to the channel's receiving end. From b1c2b909318814815ea75815f9d93671abedc388 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 13 Oct 2023 05:49:17 -0600 Subject: [PATCH 07/11] Fix timeout in RecvChannel.recv(). --- Lib/test/support/interpreters.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/support/interpreters.py b/Lib/test/support/interpreters.py index 0f89cf53a40d20..f8f42c0e02479c 100644 --- a/Lib/test/support/interpreters.py +++ b/Lib/test/support/interpreters.py @@ -183,11 +183,11 @@ def recv(self, timeout=None, *, timeout = int(timeout) if timeout < 0: raise ValueError(f'timeout value must be non-negative') - end = time.now() + timeout + end = time.time() + timeout obj = _channels.recv(self._id, _sentinel) while obj is _sentinel: time.sleep(_delay) - if timeout is not None and time.now() >= end: + if timeout is not None and time.time() >= end: raise TimeoutError obj = _channels.recv(self._id, _sentinel) return obj From cdd8bfe65617da6d7d6fd906d15a5ebd03de6c57 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 10 Oct 2023 07:36:06 -0600 Subject: [PATCH 08/11] Add timeout tests. --- Lib/test/test__xxinterpchannels.py | 70 ++++++++++++++++++++++++++++++ Lib/test/test_interpreters.py | 5 +++ 2 files changed, 75 insertions(+) diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index 90a1224498fe6d..0967c3ec421c22 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -621,6 +621,76 @@ def test_run_string_arg_resolved(self): #------------------- # send/recv + def test_send_timeout(self): + obj = b'spam' + + with self.subTest('non-blocking with timeout'): + cid = channels.create() + with self.assertRaises(ValueError): + channels.send(cid, obj, blocking=False, timeout=0.1) + + with self.subTest('timeout hit'): + cid = channels.create() + with self.assertRaises(TimeoutError): + channels.send(cid, obj, blocking=True, timeout=0.1) + # XXX Verify the channel is empty now. + + with self.subTest('timeout not hit'): + cid = channels.create() + def f(): + recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send(cid, obj, blocking=True, timeout=10) + t.join() + + with self.subTest('channel closed while waiting'): + cid = channels.create() + def f(): + # sleep() isn't a great for this, but definitely simple. + time.sleep(1) + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send(cid, obj, blocking=True, timeout=2) + t.join() + + def test_send_buffer_timeout(self): + obj = bytearray(b'spam') + + with self.subTest('non-blocking with timeout'): + cid = channels.create() + with self.assertRaises(ValueError): + channels.send_buffer(cid, obj, blocking=False, timeout=0.1) + + with self.subTest('timeout hit'): + cid = channels.create() + with self.assertRaises(TimeoutError): + channels.send_buffer(cid, obj, blocking=True, timeout=0.1) + # XXX Verify the channel is empty now. + + with self.subTest('timeout not hit'): + cid = channels.create() + def f(): + recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send_buffer(cid, obj, blocking=True, timeout=10) + t.join() + + with self.subTest('channel closed while waiting'): + cid = channels.create() + def f(): + # sleep() isn't a great for this, but definitely simple. + time.sleep(1) + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send_buffer(cid, obj, blocking=True, timeout=2) + t.join() + def test_send_recv_main(self): cid = channels.create() orig = b'spam' diff --git a/Lib/test/test_interpreters.py b/Lib/test/test_interpreters.py index 0910b51bfe5dbd..d2d52ec9a7808f 100644 --- a/Lib/test/test_interpreters.py +++ b/Lib/test/test_interpreters.py @@ -1022,6 +1022,11 @@ def test_send_recv_nowait_different_interpreters(self): self.assertEqual(obj2, b'eggs') self.assertNotEqual(id(obj2), int(out)) + def test_recv_timeout(self): + r, _ = interpreters.create_channel() + with self.assertRaises(TimeoutError): + r.recv(timeout=1) + def test_recv_channel_does_not_exist(self): ch = interpreters.RecvChannel(1_000_000) with self.assertRaises(interpreters.ChannelNotFoundError): From 84dd5ac7d560c8faada49161c10b22b813e6672e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 10 Oct 2023 07:38:45 -0600 Subject: [PATCH 09/11] Clear the item if timed out. --- Lib/test/test__xxinterpchannels.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index 0967c3ec421c22..c21c085331c649 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -633,7 +633,9 @@ def test_send_timeout(self): cid = channels.create() with self.assertRaises(TimeoutError): channels.send(cid, obj, blocking=True, timeout=0.1) - # XXX Verify the channel is empty now. + with self.assertRaises(channels.ChannelEmptyError): + received = channels.recv(cid) + print(repr(received)) with self.subTest('timeout not hit'): cid = channels.create() @@ -668,7 +670,9 @@ def test_send_buffer_timeout(self): cid = channels.create() with self.assertRaises(TimeoutError): channels.send_buffer(cid, obj, blocking=True, timeout=0.1) - # XXX Verify the channel is empty now. + with self.assertRaises(channels.ChannelEmptyError): + received = channels.recv(cid) + print(repr(received)) with self.subTest('timeout not hit'): cid = channels.create() From 7ebd50cbb4d1e990abc044d74dd87828002984d5 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 17 Oct 2023 15:04:37 -0600 Subject: [PATCH 10/11] Move the tests. --- Lib/test/test__xxinterpchannels.py | 148 ++++++++++++++--------------- 1 file changed, 74 insertions(+), 74 deletions(-) diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index c21c085331c649..991c4db4d39785 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -621,80 +621,6 @@ def test_run_string_arg_resolved(self): #------------------- # send/recv - def test_send_timeout(self): - obj = b'spam' - - with self.subTest('non-blocking with timeout'): - cid = channels.create() - with self.assertRaises(ValueError): - channels.send(cid, obj, blocking=False, timeout=0.1) - - with self.subTest('timeout hit'): - cid = channels.create() - with self.assertRaises(TimeoutError): - channels.send(cid, obj, blocking=True, timeout=0.1) - with self.assertRaises(channels.ChannelEmptyError): - received = channels.recv(cid) - print(repr(received)) - - with self.subTest('timeout not hit'): - cid = channels.create() - def f(): - recv_wait(cid) - t = threading.Thread(target=f) - t.start() - channels.send(cid, obj, blocking=True, timeout=10) - t.join() - - with self.subTest('channel closed while waiting'): - cid = channels.create() - def f(): - # sleep() isn't a great for this, but definitely simple. - time.sleep(1) - channels.close(cid, force=True) - t = threading.Thread(target=f) - t.start() - with self.assertRaises(channels.ChannelClosedError): - channels.send(cid, obj, blocking=True, timeout=2) - t.join() - - def test_send_buffer_timeout(self): - obj = bytearray(b'spam') - - with self.subTest('non-blocking with timeout'): - cid = channels.create() - with self.assertRaises(ValueError): - channels.send_buffer(cid, obj, blocking=False, timeout=0.1) - - with self.subTest('timeout hit'): - cid = channels.create() - with self.assertRaises(TimeoutError): - channels.send_buffer(cid, obj, blocking=True, timeout=0.1) - with self.assertRaises(channels.ChannelEmptyError): - received = channels.recv(cid) - print(repr(received)) - - with self.subTest('timeout not hit'): - cid = channels.create() - def f(): - recv_wait(cid) - t = threading.Thread(target=f) - t.start() - channels.send_buffer(cid, obj, blocking=True, timeout=10) - t.join() - - with self.subTest('channel closed while waiting'): - cid = channels.create() - def f(): - # sleep() isn't a great for this, but definitely simple. - time.sleep(1) - channels.close(cid, force=True) - t = threading.Thread(target=f) - t.start() - with self.assertRaises(channels.ChannelClosedError): - channels.send_buffer(cid, obj, blocking=True, timeout=2) - t.join() - def test_send_recv_main(self): cid = channels.create() orig = b'spam' @@ -976,6 +902,80 @@ def f(): channels.send_buffer(cid, obj, blocking=True) t.join() + def test_send_timeout(self): + obj = b'spam' + + with self.subTest('non-blocking with timeout'): + cid = channels.create() + with self.assertRaises(ValueError): + channels.send(cid, obj, blocking=False, timeout=0.1) + + with self.subTest('timeout hit'): + cid = channels.create() + with self.assertRaises(TimeoutError): + channels.send(cid, obj, blocking=True, timeout=0.1) + with self.assertRaises(channels.ChannelEmptyError): + received = channels.recv(cid) + print(repr(received)) + + with self.subTest('timeout not hit'): + cid = channels.create() + def f(): + recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send(cid, obj, blocking=True, timeout=10) + t.join() + + with self.subTest('channel closed while waiting'): + cid = channels.create() + def f(): + # sleep() isn't a great for this, but definitely simple. + time.sleep(1) + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send(cid, obj, blocking=True, timeout=2) + t.join() + + def test_send_buffer_timeout(self): + obj = bytearray(b'spam') + + with self.subTest('non-blocking with timeout'): + cid = channels.create() + with self.assertRaises(ValueError): + channels.send_buffer(cid, obj, blocking=False, timeout=0.1) + + with self.subTest('timeout hit'): + cid = channels.create() + with self.assertRaises(TimeoutError): + channels.send_buffer(cid, obj, blocking=True, timeout=0.1) + with self.assertRaises(channels.ChannelEmptyError): + received = channels.recv(cid) + print(repr(received)) + + with self.subTest('timeout not hit'): + cid = channels.create() + def f(): + recv_wait(cid) + t = threading.Thread(target=f) + t.start() + channels.send_buffer(cid, obj, blocking=True, timeout=10) + t.join() + + with self.subTest('channel closed while waiting'): + cid = channels.create() + def f(): + # sleep() isn't a great for this, but definitely simple. + time.sleep(1) + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send_buffer(cid, obj, blocking=True, timeout=2) + t.join() + #------------------- # close From 2191920af9e1b66ec573e42af85c1138dbe96484 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 17 Oct 2023 15:14:45 -0600 Subject: [PATCH 11/11] Hide a refleak. --- Lib/test/test__xxinterpchannels.py | 122 ++++++++++++++++------------- 1 file changed, 68 insertions(+), 54 deletions(-) diff --git a/Lib/test/test__xxinterpchannels.py b/Lib/test/test__xxinterpchannels.py index 991c4db4d39785..1c1ef3fac9d65f 100644 --- a/Lib/test/test__xxinterpchannels.py +++ b/Lib/test/test__xxinterpchannels.py @@ -864,44 +864,6 @@ def f(): self.assertEqual(received, obj) - def test_send_closed_while_waiting(self): - obj = b'spam' - wait = self.build_send_waiter(obj) - cid = channels.create() - def f(): - wait() - channels.close(cid, force=True) - t = threading.Thread(target=f) - t.start() - with self.assertRaises(channels.ChannelClosedError): - channels.send(cid, obj, blocking=True) - t.join() - - def test_send_buffer_closed_while_waiting(self): - try: - self._has_run_once - except AttributeError: - # At the moment, this test leaks a few references. - # It looks like the leak originates with the addition - # of _channels.send_buffer() (gh-110246), whereas the - # tests were added afterward. We want this test even - # if the refleak isn't fixed yet, so we skip here. - raise unittest.SkipTest('temporarily skipped due to refleaks') - else: - self._has_run_once = True - - obj = bytearray(b'spam') - wait = self.build_send_waiter(obj, buffer=True) - cid = channels.create() - def f(): - wait() - channels.close(cid, force=True) - t = threading.Thread(target=f) - t.start() - with self.assertRaises(channels.ChannelClosedError): - channels.send_buffer(cid, obj, blocking=True) - t.join() - def test_send_timeout(self): obj = b'spam' @@ -927,19 +889,19 @@ def f(): channels.send(cid, obj, blocking=True, timeout=10) t.join() - with self.subTest('channel closed while waiting'): - cid = channels.create() - def f(): - # sleep() isn't a great for this, but definitely simple. - time.sleep(1) - channels.close(cid, force=True) - t = threading.Thread(target=f) - t.start() - with self.assertRaises(channels.ChannelClosedError): - channels.send(cid, obj, blocking=True, timeout=2) - t.join() - def test_send_buffer_timeout(self): + try: + self._has_run_once_timeout + except AttributeError: + # At the moment, this test leaks a few references. + # It looks like the leak originates with the addition + # of _channels.send_buffer() (gh-110246), whereas the + # tests were added afterward. We want this test even + # if the refleak isn't fixed yet, so we skip here. + raise unittest.SkipTest('temporarily skipped due to refleaks') + else: + self._has_run_once_timeout = True + obj = bytearray(b'spam') with self.subTest('non-blocking with timeout'): @@ -964,16 +926,68 @@ def f(): channels.send_buffer(cid, obj, blocking=True, timeout=10) t.join() - with self.subTest('channel closed while waiting'): + def test_send_closed_while_waiting(self): + obj = b'spam' + wait = self.build_send_waiter(obj) + + with self.subTest('without timeout'): + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send(cid, obj, blocking=True) + t.join() + + with self.subTest('with timeout'): + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send(cid, obj, blocking=True, timeout=30) + t.join() + + def test_send_buffer_closed_while_waiting(self): + try: + self._has_run_once_closed + except AttributeError: + # At the moment, this test leaks a few references. + # It looks like the leak originates with the addition + # of _channels.send_buffer() (gh-110246), whereas the + # tests were added afterward. We want this test even + # if the refleak isn't fixed yet, so we skip here. + raise unittest.SkipTest('temporarily skipped due to refleaks') + else: + self._has_run_once_closed = True + + obj = bytearray(b'spam') + wait = self.build_send_waiter(obj, buffer=True) + + with self.subTest('without timeout'): + cid = channels.create() + def f(): + wait() + channels.close(cid, force=True) + t = threading.Thread(target=f) + t.start() + with self.assertRaises(channels.ChannelClosedError): + channels.send_buffer(cid, obj, blocking=True) + t.join() + + with self.subTest('with timeout'): cid = channels.create() def f(): - # sleep() isn't a great for this, but definitely simple. - time.sleep(1) + wait() channels.close(cid, force=True) t = threading.Thread(target=f) t.start() with self.assertRaises(channels.ChannelClosedError): - channels.send_buffer(cid, obj, blocking=True, timeout=2) + channels.send_buffer(cid, obj, blocking=True, timeout=30) t.join() #-------------------