Skip to content
This repository was archived by the owner on Mar 26, 2024. It is now read-only.

Commit a0fc5fe

Browse files
committed
Optimise get_rooms_for_user (drop with_stream_ordering) (matrix-org#13787)
# Conflicts: # synapse/storage/_base.py # synapse/storage/databases/main/roommember.py
1 parent a93a8b3 commit a0fc5fe

File tree

7 files changed

+94
-104
lines changed

7 files changed

+94
-104
lines changed

changelog.d/13787.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Optimise get rooms for user calls. Contributed by Nick @ Beeper (@fizzadar).

synapse/handlers/device.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,9 @@ async def get_user_ids_changed(
268268
possibly_left = possibly_changed | possibly_left
269269

270270
# Double check if we still share rooms with the given user.
271-
users_rooms = await self.store.get_rooms_for_users_with_stream_ordering(
272-
possibly_left
273-
)
271+
users_rooms = await self.store.get_rooms_for_users(possibly_left)
274272
for changed_user_id, entries in users_rooms.items():
275-
if any(e.room_id in room_ids for e in entries):
273+
if any(rid in room_ids for rid in entries):
276274
possibly_left.discard(changed_user_id)
277275
else:
278276
possibly_joined.discard(changed_user_id)

synapse/handlers/sync.py

+4-10
Original file line numberDiff line numberDiff line change
@@ -1507,16 +1507,14 @@ async def _generate_sync_entry_for_device_list(
15071507
since_token.device_list_key
15081508
)
15091509
if changed_users is not None:
1510-
result = await self.store.get_rooms_for_users_with_stream_ordering(
1511-
changed_users
1512-
)
1510+
result = await self.store.get_rooms_for_users(changed_users)
15131511

15141512
for changed_user_id, entries in result.items():
15151513
# Check if the changed user shares any rooms with the user,
15161514
# or if the changed user is the syncing user (as we always
15171515
# want to include device list updates of their own devices).
15181516
if user_id == changed_user_id or any(
1519-
e.room_id in joined_rooms for e in entries
1517+
rid in joined_rooms for rid in entries
15201518
):
15211519
users_that_have_changed.add(changed_user_id)
15221520
else:
@@ -1550,13 +1548,9 @@ async def _generate_sync_entry_for_device_list(
15501548
newly_left_users.update(left_users)
15511549

15521550
# Remove any users that we still share a room with.
1553-
left_users_rooms = (
1554-
await self.store.get_rooms_for_users_with_stream_ordering(
1555-
newly_left_users
1556-
)
1557-
)
1551+
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
15581552
for user_id, entries in left_users_rooms.items():
1559-
if any(e.room_id in joined_rooms for e in entries):
1553+
if any(rid in joined_rooms for rid in entries):
15601554
newly_left_users.discard(user_id)
15611555

15621556
return DeviceListUpdates(

synapse/storage/_base.py

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ def _invalidate_state_caches(
9090
self._attempt_to_invalidate_cache(
9191
"get_user_in_room_with_profile", (room_id, user_id)
9292
)
93+
self._attempt_to_invalidate_cache(
94+
"get_rooms_for_user_with_stream_ordering", (user_id,)
95+
)
96+
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
9397

9498
# Purge other caches based on room state.
9599
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))

synapse/storage/databases/main/cache.py

+1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
205205
self.get_rooms_for_user_with_stream_ordering.invalidate(
206206
(data.state_key,)
207207
)
208+
self.get_rooms_for_user.invalidate((data.state_key,))
208209
else:
209210
raise Exception("Unknown events stream row type %s" % (row.type,))
210211

synapse/storage/databases/main/roommember.py

+81-90
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import logging
1616
from typing import (
1717
TYPE_CHECKING,
18-
Callable,
1918
Collection,
2019
Dict,
2120
FrozenSet,
@@ -690,117 +689,109 @@ def _get_rooms_for_user_with_stream_ordering_txn(
690689
for room_id, instance, stream_id in txn
691690
)
692691

693-
@cachedList(
694-
cached_method_name="get_rooms_for_user_with_stream_ordering",
695-
list_name="user_ids",
696-
)
697-
async def get_rooms_for_users_with_stream_ordering(
692+
async def get_users_server_still_shares_room_with(
698693
self, user_ids: Collection[str]
699-
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
700-
"""A batched version of `get_rooms_for_user_with_stream_ordering`.
701-
702-
Returns:
703-
Map from user_id to set of rooms that is currently in.
694+
) -> Set[str]:
695+
"""Given a list of users return the set that the server still share a
696+
room with.
704697
"""
698+
699+
if not user_ids:
700+
return set()
701+
705702
return await self.db_pool.runInteraction(
706-
"get_rooms_for_users_with_stream_ordering",
707-
self._get_rooms_for_users_with_stream_ordering_txn,
703+
"get_users_server_still_shares_room_with",
704+
self.get_users_server_still_shares_room_with_txn,
708705
user_ids,
709706
)
710707

711-
def _get_rooms_for_users_with_stream_ordering_txn(
712-
self, txn: LoggingTransaction, user_ids: Collection[str]
713-
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
708+
def get_users_server_still_shares_room_with_txn(
709+
self,
710+
txn: LoggingTransaction,
711+
user_ids: Collection[str],
712+
) -> Set[str]:
713+
if not user_ids:
714+
return set()
715+
716+
sql = """
717+
SELECT state_key FROM current_state_events
718+
WHERE
719+
type = 'm.room.member'
720+
AND membership = 'join'
721+
AND %s
722+
GROUP BY state_key
723+
"""
714724

715725
clause, args = make_in_list_sql_clause(
716-
self.database_engine,
717-
"c.state_key",
718-
user_ids,
726+
self.database_engine, "state_key", user_ids
719727
)
720728

721-
if self._current_state_events_membership_up_to_date:
722-
sql = f"""
723-
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
724-
FROM current_state_events AS c
725-
INNER JOIN events AS e USING (room_id, event_id)
726-
WHERE
727-
c.type = 'm.room.member'
728-
AND c.membership = ?
729-
AND {clause}
730-
"""
731-
else:
732-
sql = f"""
733-
SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
734-
FROM current_state_events AS c
735-
INNER JOIN room_memberships AS m USING (room_id, event_id)
736-
INNER JOIN events AS e USING (room_id, event_id)
737-
WHERE
738-
c.type = 'm.room.member'
739-
AND m.membership = ?
740-
AND {clause}
741-
"""
729+
txn.execute(sql % (clause,), args)
742730

743-
txn.execute(sql, [Membership.JOIN] + args)
731+
return {row[0] for row in txn}
744732

745-
result: Dict[str, Set[GetRoomsForUserWithStreamOrdering]] = {
746-
user_id: set() for user_id in user_ids
747-
}
748-
for user_id, room_id, instance, stream_id in txn:
749-
result[user_id].add(
750-
GetRoomsForUserWithStreamOrdering(
751-
room_id, PersistedEventPosition(instance, stream_id)
752-
)
753-
)
754-
755-
return {user_id: frozenset(v) for user_id, v in result.items()}
733+
@cached(max_entries=500000, iterable=True)
734+
async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
735+
"""Returns a set of room_ids the user is currently joined to.
756736
757-
async def get_users_server_still_shares_room_with(
758-
self, user_ids: Collection[str]
759-
) -> Set[str]:
760-
"""Given a list of users return the set that the server still share a
761-
room with.
737+
If a remote user only returns rooms this server is currently
738+
participating in.
762739
"""
740+
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
741+
(user_id,),
742+
None,
743+
update_metrics=False,
744+
)
745+
if rooms:
746+
return frozenset(r.room_id for r in rooms)
763747

764-
if not user_ids:
765-
return set()
766-
767-
def _get_users_server_still_shares_room_with_txn(
768-
txn: LoggingTransaction,
769-
) -> Set[str]:
770-
sql = """
771-
SELECT state_key FROM current_state_events
772-
WHERE
773-
type = 'm.room.member'
774-
AND membership = 'join'
775-
AND %s
776-
GROUP BY state_key
777-
"""
748+
room_ids = await self.db_pool.simple_select_onecol(
749+
table="current_state_events",
750+
keyvalues={
751+
"type": EventTypes.Member,
752+
"membership": Membership.JOIN,
753+
"state_key": user_id,
754+
},
755+
retcol="room_id",
756+
desc="get_rooms_for_user",
757+
)
778758

779-
clause, args = make_in_list_sql_clause(
780-
self.database_engine, "state_key", user_ids
781-
)
759+
return frozenset(room_ids)
782760

783-
txn.execute(sql % (clause,), args)
761+
@cachedList(
762+
cached_method_name="get_rooms_for_user",
763+
list_name="user_ids",
764+
)
765+
async def get_rooms_for_users(
766+
self, user_ids: Collection[str]
767+
) -> Dict[str, FrozenSet[str]]:
768+
"""A batched version of `get_rooms_for_user`.
784769
785-
return {row[0] for row in txn}
770+
Returns:
771+
Map from user_id to set of rooms that is currently in.
772+
"""
786773

787-
return await self.db_pool.runInteraction(
788-
"get_users_server_still_shares_room_with",
789-
_get_users_server_still_shares_room_with_txn,
774+
rows = await self.db_pool.simple_select_many_batch(
775+
table="current_state_events",
776+
column="state_key",
777+
iterable=user_ids,
778+
retcols=(
779+
"state_key",
780+
"room_id",
781+
),
782+
keyvalues={
783+
"type": EventTypes.Member,
784+
"membership": Membership.JOIN,
785+
},
786+
desc="get_rooms_for_users",
790787
)
791788

792-
async def get_rooms_for_user(
793-
self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
794-
) -> FrozenSet[str]:
795-
"""Returns a set of room_ids the user is currently joined to.
789+
user_rooms: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids}
796790

797-
If a remote user only returns rooms this server is currently
798-
participating in.
799-
"""
800-
rooms = await self.get_rooms_for_user_with_stream_ordering(
801-
user_id, on_invalidate=on_invalidate
802-
)
803-
return frozenset(r.room_id for r in rooms)
791+
for row in rows:
792+
user_rooms[row["state_key"]].add(row["room_id"])
793+
794+
return {key: frozenset(rooms) for key, rooms in user_rooms.items()}
804795

805796
@cached(max_entries=10000)
806797
async def does_pair_of_users_share_a_room(

tests/handlers/test_sync.py

+1
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ def test_unknown_room_version(self):
159159

160160
# Blow away caches (supported room versions can only change due to a restart).
161161
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
162+
self.store.get_rooms_for_user.invalidate_all()
162163
self.get_success(self.store._get_event_cache.clear())
163164
self.store._event_ref.clear()
164165

0 commit comments

Comments
 (0)