Skip to content

bpo-40390: Implement channel_send_wait for subinterpreters #19715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions Lib/test/test__xxsubinterpreters.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,97 @@ def test_run_string_arg_resolved(self):
self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')

def test_channel_send_wait_same_interpreter(self):
cid = interpreters.channel_create()

received = interpreters.channel_send_wait(cid, b"send", timeout=0)
self.assertFalse(received)

obj = interpreters.channel_recv(cid)
self.assertEqual(obj, b"send")

def test_channel_send_wait_different_interpreters(self):
cid = interpreters.channel_create()
interp = interpreters.create()
_run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
import time
import math

start = time.time()
rc = _interpreters.channel_send_wait({cid}, b"send", timeout=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be worth making it clear (e.g. in a comment) that we expect this to time out (since it is happening in the same thread where we expect recv() to be called later).

end = time.time()

assert not rc
assert math.floor(end-start) == 1
"""))

obj = interpreters.channel_recv(cid)
self.assertEqual(obj, b"send")

def test_channel_send_wait_different_threads_and_interpreters(self):
cid = interpreters.channel_create()
interp = interpreters.create()

thread_exc = None
def run():
try:
out = _run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
import time

rc = _interpreters.channel_send_wait({cid}, b"send")
assert rc
"""))
except Exception as e:
nonlocal thread_exc
thread_exc = e
t = threading.Thread(target=run)
t.start()
time.sleep(0.5)

obj = interpreters.channel_recv(cid)
self.assertEqual(obj, b"send")
t.join()
assert thread_exc is None, f"{thread_exc}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert thread_exc is None, f"{thread_exc}"
self.assertIsNone(thread_exc, f"{thread_exc}")


def test_channel_send_wait_no_timeout(self):
cid = interpreters.channel_create()
interp = interpreters.create()

thread_exc = None
def run():
try:
out = _run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters
import time

rc = _interpreters.channel_send_wait({cid}, b"send", timeout=10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test function says "no_timeout". Did you mean "with_timeout"? "timeout_not_hit"?

assert rc
"""))
except Exception as e:
nonlocal thread_exc
thread_exc = e
t = threading.Thread(target=run)
t.start()
time.sleep(0.5)

obj = interpreters.channel_recv(cid)
self.assertEqual(obj, b"send")
t.join()
assert thread_exc is None, f"{thread_exc}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert thread_exc is None, f"{thread_exc}"
self.assertIsNone(thread_exc, f"{thread_exc}")


def test_invalid_channel_send_wait(self):
does_not_exist_cid = 1000
closed_cid = interpreters.channel_create()
interpreters.channel_close(closed_cid)

with self.assertRaises(interpreters.ChannelNotFoundError):
interpreters.channel_send_wait(does_not_exist_cid, b"error")

with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_send_wait(closed_cid, b"error")

# close

def test_close_single_user(self):
Expand Down Expand Up @@ -1519,6 +1610,30 @@ def test_close_used_multiple_times_by_single_user(self):
with self.assertRaises(interpreters.ChannelClosedError):
interpreters.channel_recv(cid)

def test_close_while_sender_waiting(self):
cid = interpreters.channel_create()
interp = interpreters.create()

thread_exc = None
def run():
try:
out = _run_output(interp, dedent(f"""
import _xxsubinterpreters as _interpreters

rc = _interpreters.channel_send_wait({cid}, b"send")
assert not rc
"""))
except Exception as e:
nonlocal thread_exc
thread_exc = e

t = threading.Thread(target=run)
t.start()
time.sleep(0.1)
interpreters.channel_close(cid, force=True)
t.join()
assert thread_exc is None, f"{thread_exc}"


class ChannelReleaseTests(TestBase):

Expand Down
Loading