Skip to content

Singleton provider fails every second test pytest if event loop is involved #576

Open
@chbndrhnns

Description

@chbndrhnns

There seems to be a strange issue when using a Singleton provider that runs in async mode and needs access to the event loop: Every second test fails.

I was able to narrow it down to this snippet below. The production code is really short. The test which fails consistently is test_singleton_scope.

There are two fixes I found:

  • Calling reset_singletons() after each test
  • Using a function scope for the container fixture

To be honest, I don't understand why this is necessary and how the Singleton can cause this event-loop-related error.

Is there anything we could do about this?

import httpx
import pytest
from dependency_injector import containers, providers

# Production code

class HttpxClient:
    def __init__(self):
        self._client = httpx.AsyncClient()

    async def get(self):
        return await self._client.get("https://jsonplaceholder.typicode.com/todos/1")


class Container(containers.DeclarativeContainer):
    client = providers.Singleton(HttpxClient)

# Test code

@pytest.fixture(autouse=True)
def anyio_backend():
    return "asyncio"


@pytest.fixture(scope="module")
def container():
    return Container()


@pytest.fixture
def reset_singletons(container):
    with container.reset_singletons():
        yield


@pytest.mark.parametrize("iteration", range(4))
async def test_singleton_scope(iteration, container):
    assert await container.client().get()


@pytest.mark.parametrize("iteration", range(4))
async def test_singleton_scope__with_reset(iteration, container, reset_singletons):
    assert await container.client().get()

@pytest.mark.parametrize("iteration", range(4))
async def test_other_scope(iteration, container):
    with container.client.override(
        providers.ContextLocalSingleton(HttpxClient)):
        assert await container.client().get()

Running this with pytest leads to a strange error message:

Click to toggle error message
FAILED                      [ 16%]
test_scope_httpx.py:33 (test_singleton_scope[1])
self = TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x107044250>, standard_compatible=False, _...t at 0x10707d130>, _read_bio=<_ssl.MemoryBIO object at 0x105c979f0>, _write_bio=<_ssl.MemoryBIO object at 0x106ff9770>)
func = <bound method SSLObject.read of <ssl.SSLObject object at 0x10707d130>>
args = (65536,)

    async def _call_sslobject_method(
        self, func: Callable[..., T_Retval], *args: object
    ) -> T_Retval:
        while True:
            try:
>               result = func(*args)

.venv/lib/python3.9/site-packages/anyio/streams/tls.py:108: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ssl.SSLObject object at 0x10707d130>, len = 65536, buffer = None

    def read(self, len=1024, buffer=None):
        """Read up to 'len' bytes from the SSL object and return them.
    
        If 'buffer' is provided, read into this buffer and return the number of
        bytes read.
        """
        if buffer is not None:
            v = self._sslobj.read(len, buffer)
        else:
>           v = self._sslobj.read(len)
E           ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2633)

../../../.pyenv/versions/3.9.9/lib/python3.9/ssl.py:888: SSLWantReadError

During handling of the above exception, another exception occurred:

self = <AsyncHTTP11Connection ['https://jsonplaceholder.typicode.com:443', CLOSED, Request Count: 2]>
request = <Request [b'GET']>

    async def handle_async_request(self, request: Request) -> Response:
        if not self.can_handle_request(request.url.origin):
            raise RuntimeError(
                f"Attempted to send request to {request.url.origin} on connection "
                f"to {self._origin}"
            )
    
        async with self._state_lock:
            if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
                self._request_count += 1
                self._state = HTTPConnectionState.ACTIVE
                self._expire_at = None
            else:
                raise ConnectionNotAvailable()
    
        try:
            kwargs = {"request": request}
            async with Trace("http11.send_request_headers", request, kwargs) as trace:
                await self._send_request_headers(**kwargs)
            async with Trace("http11.send_request_body", request, kwargs) as trace:
                await self._send_request_body(**kwargs)
            async with Trace(
                "http11.receive_response_headers", request, kwargs
            ) as trace:
                (
                    http_version,
                    status,
                    reason_phrase,
                    headers,
>               ) = await self._receive_response_headers(**kwargs)

.venv/lib/python3.9/site-packages/httpcore/_async/http11.py:81: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <AsyncHTTP11Connection ['https://jsonplaceholder.typicode.com:443', CLOSED, Request Count: 2]>
request = <Request [b'GET']>

    async def _receive_response_headers(
        self, request: Request
    ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
        timeouts = request.extensions.get("timeout", {})
        timeout = timeouts.get("read", None)
    
        while True:
>           event = await self._receive_event(timeout=timeout)

.venv/lib/python3.9/site-packages/httpcore/_async/http11.py:143: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <AsyncHTTP11Connection ['https://jsonplaceholder.typicode.com:443', CLOSED, Request Count: 2]>
timeout = 5.0

    async def _receive_event(self, timeout: float = None) -> H11Event:
        while True:
            with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
                event = self._h11_state.next_event()
    
            if event is h11.NEED_DATA:
>               data = await self._network_stream.read(
                    self.READ_NUM_BYTES, timeout=timeout
                )

.venv/lib/python3.9/site-packages/httpcore/_async/http11.py:172: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <httpcore.backends.asyncio.AsyncIOStream object at 0x107049130>
max_bytes = 65536, timeout = 5.0

    async def read(self, max_bytes: int, timeout: float = None) -> bytes:
        exc_map = {
            TimeoutError: ReadTimeout,
            anyio.BrokenResourceError: ReadError,
        }
        with map_exceptions(exc_map):
            with anyio.fail_after(timeout):
                try:
>                   return await self._stream.receive(max_bytes=max_bytes)

.venv/lib/python3.9/site-packages/httpcore/backends/asyncio.py:31: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x107044250>, standard_compatible=False, _...t at 0x10707d130>, _read_bio=<_ssl.MemoryBIO object at 0x105c979f0>, _write_bio=<_ssl.MemoryBIO object at 0x106ff9770>)
max_bytes = 65536

    async def receive(self, max_bytes: int = 65536) -> bytes:
>       data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)

.venv/lib/python3.9/site-packages/anyio/streams/tls.py:171: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = TLSStream(transport_stream=<anyio._backends._asyncio.SocketStream object at 0x107044250>, standard_compatible=False, _...t at 0x10707d130>, _read_bio=<_ssl.MemoryBIO object at 0x105c979f0>, _write_bio=<_ssl.MemoryBIO object at 0x106ff9770>)
func = <bound method SSLObject.read of <ssl.SSLObject object at 0x10707d130>>
args = (65536,)

    async def _call_sslobject_method(
        self, func: Callable[..., T_Retval], *args: object
    ) -> T_Retval:
        while True:
            try:
                result = func(*args)
            except ssl.SSLWantReadError:
                try:
                    # Flush any pending writes first
                    if self._write_bio.pending:
                        await self.transport_stream.send(self._write_bio.read())
    
>                   data = await self.transport_stream.receive()

.venv/lib/python3.9/site-packages/anyio/streams/tls.py:115: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <anyio._backends._asyncio.SocketStream object at 0x107044250>
max_bytes = 65536

    async def receive(self, max_bytes: int = 65536) -> bytes:
        with self._receive_guard:
            await checkpoint()
    
            if not self._protocol.read_event.is_set() and not self._transport.is_closing():
>               self._transport.resume_reading()

.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:1104: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_SelectorSocketTransport closing fd=17>

    def resume_reading(self):
        if self._closing or not self._paused:
            return
        self._paused = False
>       self._add_reader(self._sock_fd, self._read_ready)

../../../.pyenv/versions/3.9.9/lib/python3.9/asyncio/selector_events.py:808: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_SelectorSocketTransport closing fd=17>, fd = 17
callback = <bound method _SelectorSocketTransport._read_ready of <_SelectorSocketTransport closing fd=17>>
args = ()

    def _add_reader(self, fd, callback, *args):
        if self._closing:
            return
    
>       self._loop._add_reader(fd, callback, *args)

../../../.pyenv/versions/3.9.9/lib/python3.9/asyncio/selector_events.py:754: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>, fd = 17
callback = <bound method _SelectorSocketTransport._read_ready of <_SelectorSocketTransport closing fd=17>>
args = ()

    def _add_reader(self, fd, callback, *args):
>       self._check_closed()

../../../.pyenv/versions/3.9.9/lib/python3.9/asyncio/selector_events.py:258: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

../../../.pyenv/versions/3.9.9/lib/python3.9/asyncio/base_events.py:510: RuntimeError

During handling of the above exception, another exception occurred:

iteration = 1
container = <dependency_injector.containers.DynamicContainer object at 0x106fe8d60>

    @pytest.mark.parametrize("iteration", range(4))
    async def test_singleton_scope(iteration, container):
>       assert await container.client().get()

test_scope_httpx.py:36: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
test_scope_httpx.py:11: in get
    return await self._client.get("https://jsonplaceholder.typicode.com/todos/1")
.venv/lib/python3.9/site-packages/httpx/_client.py:1729: in get
    return await self.request(
.venv/lib/python3.9/site-packages/httpx/_client.py:1506: in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
.venv/lib/python3.9/site-packages/httpx/_client.py:1593: in send
    response = await self._send_handling_auth(
.venv/lib/python3.9/site-packages/httpx/_client.py:1621: in _send_handling_auth
    response = await self._send_handling_redirects(
.venv/lib/python3.9/site-packages/httpx/_client.py:1658: in _send_handling_redirects
    response = await self._send_single_request(request)
.venv/lib/python3.9/site-packages/httpx/_client.py:1695: in _send_single_request
    response = await transport.handle_async_request(request)
.venv/lib/python3.9/site-packages/httpx/_transports/default.py:353: in handle_async_request
    resp = await self._pool.handle_async_request(req)
.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py:253: in handle_async_request
    raise exc
.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py:237: in handle_async_request
    response = await connection.handle_async_request(request)
.venv/lib/python3.9/site-packages/httpcore/_async/connection.py:90: in handle_async_request
    return await self._connection.handle_async_request(request)
.venv/lib/python3.9/site-packages/httpcore/_async/http11.py:101: in handle_async_request
    await self._response_closed()
.venv/lib/python3.9/site-packages/httpcore/_async/http11.py:204: in _response_closed
    await self.aclose()
.venv/lib/python3.9/site-packages/httpcore/_async/http11.py:212: in aclose
    await self._network_stream.aclose()
.venv/lib/python3.9/site-packages/httpcore/backends/asyncio.py:48: in aclose
    await self._stream.aclose()
.venv/lib/python3.9/site-packages/anyio/streams/tls.py:168: in aclose
    await self.transport_stream.aclose()
.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:1163: in aclose
    self._transport.close()
../../../.pyenv/versions/3.9.9/lib/python3.9/asyncio/selector_events.py:700: in close
    self._loop.call_soon(self._call_connection_lost, None)
../../../.pyenv/versions/3.9.9/lib/python3.9/asyncio/base_events.py:746: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

../../../.pyenv/versions/3.9.9/lib/python3.9/asyncio/base_events.py:510: RuntimeError

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions