Skip to content

Commit dba99cf

Browse files
committed
fix(backend/postgres): allow concurrent pubs
This fix adds an asyncio.Lock to avoid `asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress` fixes #22
1 parent 9255c29 commit dba99cf

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

broadcaster/_backends/postgres.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def __init__(self, url: str):
1313

1414
async def connect(self) -> None:
1515
self._conn = await asyncpg.connect(self._url)
16+
self._lock = asyncio.Lock()
1617
self._listen_queue: asyncio.Queue = asyncio.Queue()
1718

1819
async def disconnect(self) -> None:
@@ -25,7 +26,8 @@ async def unsubscribe(self, channel: str) -> None:
2526
await self._conn.remove_listener(channel, self._listener)
2627

2728
async def publish(self, channel: str, message: str) -> None:
28-
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
29+
async with self._lock:
30+
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
2931

3032
def _listener(self, *args: Any) -> None:
3133
connection, pid, channel, payload = args

0 commit comments

Comments
 (0)