|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
| 3 | +import socket |
| 4 | +from collections.abc import Sequence |
| 5 | + |
3 | 6 | import pytest
|
4 | 7 | from _pytest.logging import LogCaptureFixture
|
5 | 8 | from _pytest.pytester import Pytester
|
6 | 9 |
|
7 | 10 | from anyio import get_all_backends
|
| 11 | +from anyio.pytest_plugin import FreePortFactory |
8 | 12 |
|
9 |
| -pytestmark = pytest.mark.filterwarnings( |
10 |
| - "ignore:The TerminalReporter.writer attribute is deprecated" |
11 |
| - ":pytest.PytestDeprecationWarning:" |
12 |
| -) |
| 13 | +pytestmark = [ |
| 14 | + pytest.mark.filterwarnings( |
| 15 | + "ignore:The TerminalReporter.writer attribute is deprecated" |
| 16 | + ":pytest.PytestDeprecationWarning:" |
| 17 | + ), |
| 18 | + pytest.mark.anyio, |
| 19 | +] |
13 | 20 |
|
14 | 21 | pytest_args = "-v", "-p", "anyio", "-p", "no:asyncio", "-p", "no:trio"
|
15 | 22 |
|
@@ -561,3 +568,62 @@ async def test_params(fixt):
|
561 | 568 |
|
562 | 569 | result = testdir.runpytest(*pytest_args)
|
563 | 570 | result.assert_outcomes(passed=len(get_all_backends()) * 2)
|
| 571 | + |
| 572 | + |
| 573 | +class TestFreePortFactory: |
| 574 | + @pytest.fixture(scope="class") |
| 575 | + def families(self) -> Sequence[tuple[socket.AddressFamily, str]]: |
| 576 | + from .test_sockets import has_ipv6 |
| 577 | + |
| 578 | + families: list[tuple[socket.AddressFamily, str]] = [ |
| 579 | + (socket.AF_INET, "127.0.0.1") |
| 580 | + ] |
| 581 | + if has_ipv6: |
| 582 | + families.append((socket.AF_INET6, "::1")) |
| 583 | + |
| 584 | + return families |
| 585 | + |
| 586 | + async def test_tcp_factory( |
| 587 | + self, |
| 588 | + families: Sequence[tuple[socket.AddressFamily, str]], |
| 589 | + free_tcp_port_factory: FreePortFactory, |
| 590 | + ) -> None: |
| 591 | + generated_ports = {free_tcp_port_factory() for _ in range(5)} |
| 592 | + assert all(isinstance(port, int) for port in generated_ports) |
| 593 | + assert len(generated_ports) == 5 |
| 594 | + for port in generated_ports: |
| 595 | + for family, addr in families: |
| 596 | + with socket.socket(family, socket.SOCK_STREAM) as sock: |
| 597 | + try: |
| 598 | + sock.bind((addr, port)) |
| 599 | + except OSError: |
| 600 | + pass |
| 601 | + |
| 602 | + async def test_udp_factory( |
| 603 | + self, |
| 604 | + families: Sequence[tuple[socket.AddressFamily, str]], |
| 605 | + free_udp_port_factory: FreePortFactory, |
| 606 | + ) -> None: |
| 607 | + generated_ports = {free_udp_port_factory() for _ in range(5)} |
| 608 | + assert all(isinstance(port, int) for port in generated_ports) |
| 609 | + assert len(generated_ports) == 5 |
| 610 | + for port in generated_ports: |
| 611 | + for family, addr in families: |
| 612 | + with socket.socket(family, socket.SOCK_DGRAM) as sock: |
| 613 | + sock.bind((addr, port)) |
| 614 | + |
| 615 | + async def test_free_tcp_port( |
| 616 | + self, families: Sequence[tuple[socket.AddressFamily, str]], free_tcp_port: int |
| 617 | + ) -> None: |
| 618 | + assert isinstance(free_tcp_port, int) |
| 619 | + for family, addr in families: |
| 620 | + with socket.socket(family, socket.SOCK_STREAM) as sock: |
| 621 | + sock.bind((addr, free_tcp_port)) |
| 622 | + |
| 623 | + async def test_free_udp_port( |
| 624 | + self, families: Sequence[tuple[socket.AddressFamily, str]], free_udp_port: int |
| 625 | + ) -> None: |
| 626 | + assert isinstance(free_udp_port, int) |
| 627 | + for family, addr in families: |
| 628 | + with socket.socket(family, socket.SOCK_DGRAM) as sock: |
| 629 | + sock.bind((addr, free_udp_port)) |
0 commit comments