Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 29aa106

Browse files
committed
HACK: Ensure we want for replication to catch up
1 parent 091a313 commit 29aa106

File tree

2 files changed

+32
-4
lines changed

2 files changed

+32
-4
lines changed

synapse/replication/http/state.py

+20-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from synapse.api.errors import SynapseError
2121
from synapse.http.server import HttpServer
22+
from synapse.http.servlet import parse_json_object_from_request
2223
from synapse.replication.http._base import ReplicationEndpoint
2324
from synapse.types import JsonDict
2425

@@ -51,10 +52,15 @@ def __init__(self, hs: "HomeServer"):
5152
self._state_handler = hs.get_state_handler()
5253
self._events_shard_config = hs.config.worker.events_shard_config
5354
self._instance_name = hs.get_instance_name()
55+
self._main_store = hs.get_datastores().main
56+
self._replication = hs.get_replication_data_handler()
5457

5558
@staticmethod
56-
async def _serialize_payload(room_id: str) -> JsonDict: # type: ignore[override]
57-
return {}
59+
async def _serialize_payload(room_id: str, local_instance_name: str, unpartial_state_events_position: int) -> JsonDict: # type: ignore[override]
60+
return {
61+
"instance_name": local_instance_name,
62+
"unpartial_state_events_position": unpartial_state_events_position,
63+
}
5864

5965
async def _handle_request( # type: ignore[override]
6066
self, request: Request, room_id: str
@@ -65,9 +71,20 @@ async def _handle_request( # type: ignore[override]
6571
400, "/update_current_state request was routed to the wrong worker"
6672
)
6773

74+
payload = parse_json_object_from_request(request)
75+
await self._replication.wait_for_stream_position(
76+
payload["instance_name"],
77+
"un_partial_stated_event",
78+
payload["unpartial_state_events_position"],
79+
)
80+
6881
await self._state_handler.update_current_state(room_id)
6982

70-
return 200, {}
83+
return 200, {
84+
"caches_position": self._main_store._cache_id_gen.get_current_token_for_writer(
85+
self._instance_name
86+
)
87+
}
7188

7289

7390
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:

synapse/state/__init__.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,21 @@ async def update_current_state(self, room_id: str) -> None:
566566
"""
567567
writer_instance = self._events_shard_config.get_instance(room_id)
568568
if writer_instance != self._instance_name:
569-
await self._update_current_state_client(
569+
res = await self._update_current_state_client(
570570
instance_name=writer_instance,
571571
room_id=room_id,
572+
local_instance_name=self._instance_name,
573+
unpartial_state_events_position=self.store._un_partial_stated_events_stream_id_gen.get_current_token_for_writer(
574+
self._instance_name
575+
),
572576
)
577+
578+
await self.hs.get_replication_data_handler().wait_for_stream_position(
579+
writer_instance,
580+
"caches",
581+
res["caches_position"],
582+
)
583+
573584
return
574585

575586
assert self._storage_controllers.persistence is not None

0 commit comments

Comments
 (0)