Skip to content

Asyncpg.pool creates more connections than its max_size #1107

Open
@no-to-mediocrity

Description

@no-to-mediocrity
  • asyncpg version:0.29.0
  • PostgreSQL version: "PostgreSQL 16.0 (Debian 16.0-1.pgdg120+1) on aarch64-unknown-linux-gnu, compiled by gcc (Debian 12.2.0-14) 12.2.0, 64-bit"
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
    the issue with a local PostgreSQL install?
    : Nope, using Dockerized container
  • Python version: 3.11.5
  • Platform: MacBook-Air Darwin Kernel Version 21.1.0: Wed Oct 13 17:33:24 PDT 2021; root:xnu-8019.41.5~1/RELEASE_ARM64_T8101 arm64
  • Do you use pgbouncer?: Nope
  • Did you install asyncpg with pip?: Yes
  • If you built asyncpg locally, which version of Cython did you use?: n/a
  • Can the issue be reproduced under both asyncio and
    uvloop?
    :Nope

I have a singleton for a Database entity, which is then used to write some primitive data. When I run the write function 1000 times concurrently using asyncio.gather(), the database reports that there is more connections than the max_size of the asyncpg.pool. For example, when I was testing there were 857 active db connections, but only 62 active pool connections. No other clients/operations were running during the test. When I use uvloop to do the same thing, it just crashes with ConnectionResetError: [Errno 54] Connection reset by peer if I try to run more tasks than the size of the pool.

Is this a normal pool behavior?

I use code below (the write function is simplified though):

The database code:

  class Database:
      _instance = None
      _pool = None
      db_params = { 
                  'host': os.getenv('DATABASE_HOST'),
                  'port': os.getenv('DATABASE_PORT'),
                  'database': os.getenv('DATABASE_NAME'),
                  'user': os.getenv('DATABASE_USER'),
                  'password': os.getenv('DATABASE_PASSWORD')
              }
  
      def __new__(cls, *args, **kwargs):
          if cls._instance is None:
              cls._instance = super(Database, cls).__new__(cls)
              #print(cls._instance)
          return cls._instance
  
      @classmethod
      async def get_pool(cls):
          if cls._pool is None:
              cls._pool = await asyncpg.create_pool(**cls.db_params, min_size=1, max_size=150)
              #print(cls._pool)
          return cls._pool

    @classmethod
    async def write(cls, result):
            pool = await cls.get_pool()
            try:
                    async with pool.acquire() as connection:
                        result = await connection.execute('''
                            INSERT INTO tables.results(
                                result
                            ) VALUES($1)
                        ''', result)
                        return
            except Exception as e:
                raise e

The demo write code

async def fake_result(i):
    print(f'generating fake result {i}')
    await db.record_result(i)
    return

async def run_functions_concurrently():
   tasks = [fake_result(i) for i in range(1000)]
   await asyncio.gather(*tasks)

def main():
    asyncio.run(run_functions_concurrently())

if __name__ == "__main__":
    main()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions