19
19
20
20
from synapse .api .errors import SynapseError
21
21
from synapse .http .server import HttpServer
22
+ from synapse .http .servlet import parse_json_object_from_request
22
23
from synapse .replication .http ._base import ReplicationEndpoint
23
24
from synapse .types import JsonDict
24
25
@@ -51,10 +52,15 @@ def __init__(self, hs: "HomeServer"):
51
52
self ._state_handler = hs .get_state_handler ()
52
53
self ._events_shard_config = hs .config .worker .events_shard_config
53
54
self ._instance_name = hs .get_instance_name ()
55
+ self ._main_store = hs .get_datastores ().main
56
+ self ._replication = hs .get_replication_data_handler ()
54
57
55
58
@staticmethod
56
- async def _serialize_payload (room_id : str ) -> JsonDict : # type: ignore[override]
57
- return {}
59
+ async def _serialize_payload (room_id : str , local_instance_name : str , unpartial_state_events_position : int ) -> JsonDict : # type: ignore[override]
60
+ return {
61
+ "instance_name" : local_instance_name ,
62
+ "unpartial_state_events_position" : unpartial_state_events_position ,
63
+ }
58
64
59
65
async def _handle_request ( # type: ignore[override]
60
66
self , request : Request , room_id : str
@@ -65,9 +71,22 @@ async def _handle_request( # type: ignore[override]
65
71
400 , "/update_current_state request was routed to the wrong worker"
66
72
)
67
73
74
+ payload = parse_json_object_from_request (request )
75
+ await self ._replication .wait_for_stream_position (
76
+ payload ["instance_name" ],
77
+ "un_partial_stated_event" ,
78
+ payload ["unpartial_state_events_position" ],
79
+ )
80
+
68
81
await self ._state_handler .update_current_state (room_id )
69
82
70
- return 200 , {}
83
+ assert self ._main_store ._cache_id_gen is not None
84
+
85
+ return 200 , {
86
+ "caches_position" : self ._main_store ._cache_id_gen .get_current_token_for_writer (
87
+ self ._instance_name
88
+ )
89
+ }
71
90
72
91
73
92
def register_servlets (hs : "HomeServer" , http_server : HttpServer ) -> None :
0 commit comments