Skip to content
This repository was archived by the owner on Mar 26, 2024. It is now read-only.

Commit 74f3088

Browse files
committed
Optimise get_rooms_for_user (drop with_stream_ordering) (matrix-org#13787)
# Conflicts: # synapse/storage/_base.py # synapse/storage/databases/main/roommember.py
1 parent f76758b commit 74f3088

File tree

7 files changed

+94
-94
lines changed

7 files changed

+94
-94
lines changed

changelog.d/13787.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Optimise get rooms for user calls. Contributed by Nick @ Beeper (@fizzadar).

synapse/handlers/device.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,9 @@ async def get_user_ids_changed(
270270
possibly_left = possibly_changed | possibly_left
271271

272272
# Double check if we still share rooms with the given user.
273-
users_rooms = await self.store.get_rooms_for_users_with_stream_ordering(
274-
possibly_left
275-
)
273+
users_rooms = await self.store.get_rooms_for_users(possibly_left)
276274
for changed_user_id, entries in users_rooms.items():
277-
if any(e.room_id in room_ids for e in entries):
275+
if any(rid in room_ids for rid in entries):
278276
possibly_left.discard(changed_user_id)
279277
else:
280278
possibly_joined.discard(changed_user_id)

synapse/handlers/sync.py

+4-10
Original file line numberDiff line numberDiff line change
@@ -1507,16 +1507,14 @@ async def _generate_sync_entry_for_device_list(
15071507
since_token.device_list_key
15081508
)
15091509
if changed_users is not None:
1510-
result = await self.store.get_rooms_for_users_with_stream_ordering(
1511-
changed_users
1512-
)
1510+
result = await self.store.get_rooms_for_users(changed_users)
15131511

15141512
for changed_user_id, entries in result.items():
15151513
# Check if the changed user shares any rooms with the user,
15161514
# or if the changed user is the syncing user (as we always
15171515
# want to include device list updates of their own devices).
15181516
if user_id == changed_user_id or any(
1519-
e.room_id in joined_rooms for e in entries
1517+
rid in joined_rooms for rid in entries
15201518
):
15211519
users_that_have_changed.add(changed_user_id)
15221520
else:
@@ -1550,13 +1548,9 @@ async def _generate_sync_entry_for_device_list(
15501548
newly_left_users.update(left_users)
15511549

15521550
# Remove any users that we still share a room with.
1553-
left_users_rooms = (
1554-
await self.store.get_rooms_for_users_with_stream_ordering(
1555-
newly_left_users
1556-
)
1557-
)
1551+
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
15581552
for user_id, entries in left_users_rooms.items():
1559-
if any(e.room_id in joined_rooms for e in entries):
1553+
if any(rid in joined_rooms for rid in entries):
15601554
newly_left_users.discard(user_id)
15611555

15621556
return DeviceListUpdates(

synapse/storage/_base.py

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ def _invalidate_state_caches(
9090
self._attempt_to_invalidate_cache(
9191
"get_user_in_room_with_profile", (room_id, user_id)
9292
)
93+
self._attempt_to_invalidate_cache(
94+
"get_rooms_for_user_with_stream_ordering", (user_id,)
95+
)
96+
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
9397

9498
# Purge other caches based on room state.
9599
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))

synapse/storage/databases/main/cache.py

+1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
205205
self.get_rooms_for_user_with_stream_ordering.invalidate(
206206
(data.state_key,)
207207
)
208+
self.get_rooms_for_user.invalidate((data.state_key,))
208209
else:
209210
raise Exception("Unknown events stream row type %s" % (row.type,))
210211

synapse/storage/databases/main/roommember.py

+81-80
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import logging
1616
from typing import (
1717
TYPE_CHECKING,
18-
Callable,
1918
Collection,
2019
Dict,
2120
FrozenSet,
@@ -52,7 +51,6 @@
5251
from synapse.util.async_helpers import Linearizer
5352
from synapse.util.caches import intern_string
5453
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
55-
from synapse.util.cancellation import cancellable
5654
from synapse.util.iterutils import batch_iter
5755
from synapse.util.metrics import Measure
5856

@@ -619,106 +617,109 @@ def _get_rooms_for_user_with_stream_ordering_txn(
619617
for room_id, instance, stream_id in txn
620618
)
621619

622-
@cachedList(
623-
cached_method_name="get_rooms_for_user_with_stream_ordering",
624-
list_name="user_ids",
625-
)
626-
async def get_rooms_for_users_with_stream_ordering(
620+
async def get_users_server_still_shares_room_with(
627621
self, user_ids: Collection[str]
628-
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
629-
"""A batched version of `get_rooms_for_user_with_stream_ordering`.
630-
631-
Returns:
632-
Map from user_id to set of rooms that is currently in.
622+
) -> Set[str]:
623+
"""Given a list of users return the set that the server still share a
624+
room with.
633625
"""
634-
return await self.db_pool.runInteraction(
635-
"get_rooms_for_users_with_stream_ordering",
636-
self._get_rooms_for_users_with_stream_ordering_txn,
637-
user_ids,
638-
)
639626

640-
def _get_rooms_for_users_with_stream_ordering_txn(
641-
self, txn: LoggingTransaction, user_ids: Collection[str]
642-
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
627+
if not user_ids:
628+
return set()
643629

644-
clause, args = make_in_list_sql_clause(
645-
self.database_engine,
646-
"c.state_key",
630+
return await self.db_pool.runInteraction(
631+
"get_users_server_still_shares_room_with",
632+
self.get_users_server_still_shares_room_with_txn,
647633
user_ids,
648634
)
649635

650-
sql = f"""
651-
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
652-
FROM current_state_events AS c
653-
INNER JOIN events AS e USING (room_id, event_id)
636+
def get_users_server_still_shares_room_with_txn(
637+
self,
638+
txn: LoggingTransaction,
639+
user_ids: Collection[str],
640+
) -> Set[str]:
641+
if not user_ids:
642+
return set()
643+
644+
sql = """
645+
SELECT state_key FROM current_state_events
654646
WHERE
655-
c.type = 'm.room.member'
656-
AND c.membership = ?
657-
AND {clause}
647+
type = 'm.room.member'
648+
AND membership = 'join'
649+
AND %s
650+
GROUP BY state_key
658651
"""
659652

660-
txn.execute(sql, [Membership.JOIN] + args)
653+
clause, args = make_in_list_sql_clause(
654+
self.database_engine, "state_key", user_ids
655+
)
661656

662-
result: Dict[str, Set[GetRoomsForUserWithStreamOrdering]] = {
663-
user_id: set() for user_id in user_ids
664-
}
665-
for user_id, room_id, instance, stream_id in txn:
666-
result[user_id].add(
667-
GetRoomsForUserWithStreamOrdering(
668-
room_id, PersistedEventPosition(instance, stream_id)
669-
)
670-
)
657+
txn.execute(sql % (clause,), args)
671658

672-
return {user_id: frozenset(v) for user_id, v in result.items()}
659+
return {row[0] for row in txn}
673660

674-
async def get_users_server_still_shares_room_with(
675-
self, user_ids: Collection[str]
676-
) -> Set[str]:
677-
"""Given a list of users return the set that the server still share a
678-
room with.
661+
@cached(max_entries=500000, iterable=True)
662+
async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
663+
"""Returns a set of room_ids the user is currently joined to.
664+
665+
If a remote user only returns rooms this server is currently
666+
participating in.
679667
"""
668+
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
669+
(user_id,),
670+
None,
671+
update_metrics=False,
672+
)
673+
if rooms:
674+
return frozenset(r.room_id for r in rooms)
680675

681-
if not user_ids:
682-
return set()
676+
room_ids = await self.db_pool.simple_select_onecol(
677+
table="current_state_events",
678+
keyvalues={
679+
"type": EventTypes.Member,
680+
"membership": Membership.JOIN,
681+
"state_key": user_id,
682+
},
683+
retcol="room_id",
684+
desc="get_rooms_for_user",
685+
)
683686

684-
def _get_users_server_still_shares_room_with_txn(
685-
txn: LoggingTransaction,
686-
) -> Set[str]:
687-
sql = """
688-
SELECT state_key FROM current_state_events
689-
WHERE
690-
type = 'm.room.member'
691-
AND membership = 'join'
692-
AND %s
693-
GROUP BY state_key
694-
"""
687+
return frozenset(room_ids)
695688

696-
clause, args = make_in_list_sql_clause(
697-
self.database_engine, "state_key", user_ids
698-
)
699-
700-
txn.execute(sql % (clause,), args)
689+
@cachedList(
690+
cached_method_name="get_rooms_for_user",
691+
list_name="user_ids",
692+
)
693+
async def get_rooms_for_users(
694+
self, user_ids: Collection[str]
695+
) -> Dict[str, FrozenSet[str]]:
696+
"""A batched version of `get_rooms_for_user`.
701697
702-
return {row[0] for row in txn}
698+
Returns:
699+
Map from user_id to set of rooms that is currently in.
700+
"""
703701

704-
return await self.db_pool.runInteraction(
705-
"get_users_server_still_shares_room_with",
706-
_get_users_server_still_shares_room_with_txn,
702+
rows = await self.db_pool.simple_select_many_batch(
703+
table="current_state_events",
704+
column="state_key",
705+
iterable=user_ids,
706+
retcols=(
707+
"state_key",
708+
"room_id",
709+
),
710+
keyvalues={
711+
"type": EventTypes.Member,
712+
"membership": Membership.JOIN,
713+
},
714+
desc="get_rooms_for_users",
707715
)
708716

709-
@cancellable
710-
async def get_rooms_for_user(
711-
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
712-
) -> FrozenSet[str]:
713-
"""Returns a set of room_ids the user is currently joined to.
717+
user_rooms: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids}
714718

715-
If a remote user only returns rooms this server is currently
716-
participating in.
717-
"""
718-
rooms = await self.get_rooms_for_user_with_stream_ordering(
719-
user_id, on_invalidate=on_invalidate
720-
)
721-
return frozenset(r.room_id for r in rooms)
719+
for row in rows:
720+
user_rooms[row["state_key"]].add(row["room_id"])
721+
722+
return {key: frozenset(rooms) for key, rooms in user_rooms.items()}
722723

723724
@cached(max_entries=10000)
724725
async def does_pair_of_users_share_a_room(

tests/handlers/test_sync.py

+1
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ def test_unknown_room_version(self):
159159

160160
# Blow away caches (supported room versions can only change due to a restart).
161161
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
162+
self.store.get_rooms_for_user.invalidate_all()
162163
self.get_success(self.store._get_event_cache.clear())
163164
self.store._event_ref.clear()
164165

0 commit comments

Comments
 (0)