Skip to content

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress when executing gino processes in parallel #576

Open
@shsimeonova

Description

@shsimeonova
  • asyncpg version: 0.20.1
  • PostgreSQL version: 11
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
    the issue with a local PostgreSQL install?
    :no
  • Python version: 3.7
  • Platform: Win
  • Do you use pgbouncer?: no
  • Did you install asyncpg with pip?: yes
  • If you built asyncpg locally, which version of Cython did you use?:
  • Can the issue be reproduced under both asyncio and
    uvloop?
    : dont know, havent used uvloop

Hello! Thank you for your attention in advance.

Im trying to run parallel tasks in batches with asyncio, gino and asyncpg.

I have the magic starting from this entry point:

 while batch:
            tasks = {asyncio.create_task(JobService.run_job(job, connection_per_job=True, endpoint_connection=endpoint_request.db_connection)): job for job in batch}
            result = await asyncio.gather(*tasks)
            # executed_jobs.append({'name': job_instance.name, 'uuid': job_instance.uuid, 'result': job_execution.response})

            start += batch_jobs_count
            batch = jobs[start:start + batch_jobs_count]

endpoint_connection=endpoint_request.db_connection is the endpoint connection started from the point of receiving the request in my application (I'm using an internal util library over asyncpg, quart and gino)

Below in JobService.run_job I have:

@classmethod
    async def run_job(cls, job: Job, connection_per_job=False, endpoint_connection=None) -> (Job, JobExecution):
        print(f'received {job.name}, time: {datetime.utcnow()}')
        print(f'job: {job.name}, connection: {endpoint_connection.raw_connection._con._stmt_exclusive_section._acquired}')

        if connection_per_job and bool(endpoint_connection.raw_connection._con._stmt_exclusive_section._acquired):
            async with db_adapter.get_db().acquire() as conn:
                print('in context')
                job_instance, job_execution = await cls._run_job_internal(job, conn=conn)
        else:
            job_instance, job_execution = await cls._run_job_internal(job)

        return job_instance, job_execution

I'm trying to optimize using db connections by first checking if the general endpoint connection is free and using it if so and if not acquiring a new one in a context manager. In order to be sure which connection is used, I'm passing a bind (connection) parameter to all of my methods related to db operations (gino usage) and they seem to fail on the second created connection in the context, i don't really understand also how a connection created explicitly can be used in another operation already (from what i understand from the error)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions