36
36
from prometheus_client import Counter , Histogram
37
37
38
38
from twisted .internet import defer
39
- from twisted .internet .defer import Deferred
40
39
41
40
from synapse .api .constants import EventTypes , Membership
42
41
from synapse .events import EventBase
52
51
StateMap ,
53
52
get_domain_from_id ,
54
53
)
55
- from synapse .util .async_helpers import ObservableDeferred
54
+ from synapse .util .async_helpers import ObservableDeferred , yieldable_gather_results
56
55
from synapse .util .metrics import Measure
57
56
58
57
logger = logging .getLogger (__name__ )
@@ -135,25 +134,24 @@ def __init__(
135
134
self ._currently_persisting_rooms : Set [str ] = set ()
136
135
self ._per_item_callback = per_item_callback
137
136
138
- def add_to_queue (self , room_id , events_and_contexts , backfilled ) -> Deferred :
137
+ async def add_to_queue (
138
+ self ,
139
+ room_id : str ,
140
+ events_and_contexts : Iterable [Tuple [EventBase , EventContext ]],
141
+ backfilled : bool ,
142
+ ) -> _PersistResult :
139
143
"""Add events to the queue, with the given persist_event options.
140
144
141
145
If we are not already processing events in this room, starts off a background
142
146
process to to so, calling the per_item_callback for each item.
143
147
144
- NB: due to the normal usage pattern of this method, it does *not*
145
- follow the synapse logcontext rules, and leaves the logcontext in
146
- place whether or not the returned deferred is ready.
147
-
148
148
Args:
149
149
room_id (str):
150
150
events_and_contexts (list[(EventBase, EventContext)]):
151
151
backfilled (bool):
152
152
153
153
Returns:
154
- defer.Deferred: a deferred which will resolve once the events are
155
- persisted. Runs its callbacks in the sentinel logcontext. The result
156
- is the same as that returned by the `_per_item_callback` passed to
154
+ the result returned by the `_per_item_callback` passed to
157
155
`__init__`.
158
156
"""
159
157
queue = self ._event_persist_queues .setdefault (room_id , deque ())
@@ -175,7 +173,7 @@ def add_to_queue(self, room_id, events_and_contexts, backfilled) -> Deferred:
175
173
176
174
end_item .events_and_contexts .extend (events_and_contexts )
177
175
self ._handle_queue (room_id )
178
- return end_item .deferred .observe ()
176
+ return await make_deferred_yieldable ( end_item .deferred .observe () )
179
177
180
178
def _handle_queue (self , room_id ):
181
179
"""Attempts to handle the queue for a room if not already being handled.
@@ -278,22 +276,20 @@ async def persist_events(
278
276
for event , ctx in events_and_contexts :
279
277
partitioned .setdefault (event .room_id , []).append ((event , ctx ))
280
278
281
- deferreds = []
282
- for room_id , evs_ctxs in partitioned . items ():
283
- d = self ._event_persist_queue .add_to_queue (
279
+ async def enqueue ( item ):
280
+ room_id , evs_ctxs = item
281
+ return await self ._event_persist_queue .add_to_queue (
284
282
room_id , evs_ctxs , backfilled = backfilled
285
283
)
286
- deferreds .append (d )
287
284
288
- # Each deferred returns a map from event ID to existing event ID if the
289
- # event was deduplicated. (The dict may also include other entries if
285
+ ret_vals = await yieldable_gather_results (enqueue , partitioned .items ())
286
+
287
+ # Each call to add_to_queue returns a map from event ID to existing event ID if
288
+ # the event was deduplicated. (The dict may also include other entries if
290
289
# the event was persisted in a batch with other events).
291
290
#
292
- # Since we use `defer.gatherResults ` we need to merge the returned list
291
+ # Since we use `concurrently_execute ` we need to merge the returned list
293
292
# of dicts into one.
294
- ret_vals = await make_deferred_yieldable (
295
- defer .gatherResults (deferreds , consumeErrors = True )
296
- )
297
293
replaced_events : Dict [str , str ] = {}
298
294
for d in ret_vals :
299
295
replaced_events .update (d )
@@ -321,14 +317,12 @@ async def persist_event(
321
317
event if it was deduplicated due to an existing event matching the
322
318
transaction ID.
323
319
"""
324
- deferred = self ._event_persist_queue .add_to_queue (
325
- event .room_id , [(event , context )], backfilled = backfilled
326
- )
327
-
328
- # The deferred returns a map from event ID to existing event ID if the
320
+ # add_to_queue returns a map from event ID to existing event ID if the
329
321
# event was deduplicated. (The dict may also include other entries if
330
322
# the event was persisted in a batch with other events.)
331
- replaced_events = await make_deferred_yieldable (deferred )
323
+ replaced_events = await self ._event_persist_queue .add_to_queue (
324
+ event .room_id , [(event , context )], backfilled = backfilled
325
+ )
332
326
replaced_event = replaced_events .get (event .event_id )
333
327
if replaced_event :
334
328
event = await self .main_store .get_event (replaced_event )
0 commit comments