45
45
from synapse .util import json_decoder
46
46
from synapse .util .async_helpers import Linearizer , concurrently_execute
47
47
from synapse .util .cancellation import cancellable
48
- from synapse .util .retryutils import NotRetryingDestination
48
+ from synapse .util .retryutils import (
49
+ NotRetryingDestination ,
50
+ filter_destinations_by_retry_limiter ,
51
+ )
49
52
50
53
if TYPE_CHECKING :
51
54
from synapse .server import HomeServer
@@ -268,10 +271,8 @@ async def query_devices(
268
271
"%d destinations to query devices for" , len (remote_queries_not_in_cache )
269
272
)
270
273
271
- async def _query (
272
- destination_queries : Tuple [str , Dict [str , Iterable [str ]]]
273
- ) -> None :
274
- destination , queries = destination_queries
274
+ async def _query (destination : str ) -> None :
275
+ queries = remote_queries_not_in_cache [destination ]
275
276
return await self ._query_devices_for_destination (
276
277
results ,
277
278
cross_signing_keys ,
@@ -281,9 +282,20 @@ async def _query(
281
282
timeout ,
282
283
)
283
284
285
+ # Only try and fetch keys for destinations that are not marked as
286
+ # down.
287
+ filtered_destinations = await filter_destinations_by_retry_limiter (
288
+ remote_queries_not_in_cache .keys (),
289
+ self .clock ,
290
+ self .store ,
291
+ # Let's give an arbitrary grace period for those hosts that are
292
+ # only recently down
293
+ retry_due_within_ms = 60 * 1000 ,
294
+ )
295
+
284
296
await concurrently_execute (
285
297
_query ,
286
- remote_queries_not_in_cache . items () ,
298
+ filtered_destinations ,
287
299
10 ,
288
300
delay_cancellation = True ,
289
301
)
0 commit comments