Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit aa97b35

Browse files
committed
Correctly advance un_partial_stated_room_stream
1 parent d24b947 commit aa97b35

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

synapse/handlers/federation.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -1727,14 +1727,16 @@ async def _sync_partial_state_room(
17271727

17281728
logger.info("Clearing partial-state flag for %s", room_id)
17291729
success = await self.store.clear_partial_state_room(room_id)
1730+
1731+
# Poke the notifier so that other workers see the write to
1732+
# the un-partial-stated rooms stream.
1733+
self._notifier.notify_replication()
1734+
17301735
if success:
17311736
logger.info("State resync complete for %s", room_id)
17321737
self._storage_controllers.state.notify_room_un_partial_stated(
17331738
room_id
17341739
)
1735-
# Poke the notifier so that other workers see the write to
1736-
# the un-partial-stated rooms stream.
1737-
self._notifier.notify_replication()
17381740

17391741
# TODO(faster_joins) update room stats and user directory?
17401742
# https://github.com/matrix-org/synapse/issues/12814

synapse/storage/databases/main/room.py

+9
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from synapse.api.room_versions import RoomVersion, RoomVersions
4444
from synapse.config.homeserver import HomeServerConfig
4545
from synapse.events import EventBase
46+
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
4647
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
4748
from synapse.storage.database import (
4849
DatabasePool,
@@ -140,6 +141,13 @@ def __init__(
140141
db_conn, "un_partial_stated_room_stream", "stream_id"
141142
)
142143

144+
def process_replication_position(
145+
self, stream_name: str, instance_name: str, token: int
146+
) -> None:
147+
if stream_name == UnPartialStatedRoomStream.NAME:
148+
self._un_partial_stated_rooms_stream_id_gen.advance(instance_name, token)
149+
return super().process_replication_position(stream_name, instance_name, token)
150+
143151
async def store_room(
144152
self,
145153
room_id: str,
@@ -2372,3 +2380,4 @@ def _clear_partial_state_room_txn(
23722380
WHERE stream_id <= ?
23732381
"""
23742382
txn.execute(sql, (device_lists_stream_id,))
2383+
txn.execute(sql, (device_lists_stream_id,))

0 commit comments

Comments
 (0)