|
15 | 15 | # limitations under the License.
|
16 | 16 | import datetime
|
17 | 17 | import logging
|
18 |
| -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast |
| 18 | +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple |
19 | 19 |
|
20 | 20 | import attr
|
21 | 21 | from prometheus_client import Counter
|
@@ -77,6 +77,7 @@ def __init__(
|
77 | 77 | self._transaction_manager = transaction_manager
|
78 | 78 | self._instance_name = hs.get_instance_name()
|
79 | 79 | self._federation_shard_config = hs.config.worker.federation_shard_config
|
| 80 | + self._state = hs.get_state_handler() |
80 | 81 |
|
81 | 82 | self._should_send_on_this_instance = True
|
82 | 83 | if not self._federation_shard_config.should_handle(
|
@@ -415,22 +416,95 @@ async def _catch_up_transmission_loop(self) -> None:
|
415 | 416 | "This should not happen." % event_ids
|
416 | 417 | )
|
417 | 418 |
|
418 |
| - if logger.isEnabledFor(logging.INFO): |
419 |
| - rooms = [p.room_id for p in catchup_pdus] |
420 |
| - logger.info("Catching up rooms to %s: %r", self._destination, rooms) |
| 419 | + # We send transactions with events from one room only, as its likely |
| 420 | + # that the remote will have to do additional processing, which may |
| 421 | + # take some time. It's better to give it small amounts of work |
| 422 | + # rather than risk the request timing out and repeatedly being |
| 423 | + # retried, and not making any progress. |
| 424 | + # |
| 425 | + # Note: `catchup_pdus` will have exactly one PDU per room. |
| 426 | + for pdu in catchup_pdus: |
| 427 | + # The PDU from the DB will be the last PDU in the room from |
| 428 | + # *this server* that wasn't sent to the remote. However, other |
| 429 | + # servers may have sent lots of events since then, and we want |
| 430 | + # to try and tell the remote only about the *latest* events in |
| 431 | + # the room. This is so that it doesn't get inundated by events |
| 432 | + # from various parts of the DAG, which all need to be processed. |
| 433 | + # |
| 434 | + # Note: this does mean that in large rooms a server coming back |
| 435 | + # online will get sent the same events from all the different |
| 436 | + # servers, but the remote will correctly deduplicate them and |
| 437 | + # handle it only once. |
| 438 | + |
| 439 | + # Step 1, fetch the current extremities |
| 440 | + extrems = await self._store.get_prev_events_for_room(pdu.room_id) |
| 441 | + |
| 442 | + if pdu.event_id in extrems: |
| 443 | + # If the event is in the extremities, then great! We can just |
| 444 | + # use that without having to do further checks. |
| 445 | + room_catchup_pdus = [pdu] |
| 446 | + else: |
| 447 | + # If not, fetch the extremities and figure out which we can |
| 448 | + # send. |
| 449 | + extrem_events = await self._store.get_events_as_list(extrems) |
| 450 | + |
| 451 | + new_pdus = [] |
| 452 | + for p in extrem_events: |
| 453 | + # We pulled this from the DB, so it'll be non-null |
| 454 | + assert p.internal_metadata.stream_ordering |
| 455 | + |
| 456 | + # Filter out events that happened before the remote went |
| 457 | + # offline |
| 458 | + if ( |
| 459 | + p.internal_metadata.stream_ordering |
| 460 | + < self._last_successful_stream_ordering |
| 461 | + ): |
| 462 | + continue |
421 | 463 |
|
422 |
| - await self._transaction_manager.send_new_transaction( |
423 |
| - self._destination, catchup_pdus, [] |
424 |
| - ) |
| 464 | + # Filter out events where the server is not in the room, |
| 465 | + # e.g. it may have left/been kicked. *Ideally* we'd pull |
| 466 | + # out the kick and send that, but it's a rare edge case |
| 467 | + # so we don't bother for now (the server that sent the |
| 468 | + # kick should send it out if its online). |
| 469 | + hosts = await self._state.get_hosts_in_room_at_events( |
| 470 | + p.room_id, [p.event_id] |
| 471 | + ) |
| 472 | + if self._destination not in hosts: |
| 473 | + continue |
425 | 474 |
|
426 |
| - sent_transactions_counter.inc() |
427 |
| - final_pdu = catchup_pdus[-1] |
428 |
| - self._last_successful_stream_ordering = cast( |
429 |
| - int, final_pdu.internal_metadata.stream_ordering |
430 |
| - ) |
431 |
| - await self._store.set_destination_last_successful_stream_ordering( |
432 |
| - self._destination, self._last_successful_stream_ordering |
433 |
| - ) |
| 475 | + new_pdus.append(p) |
| 476 | + |
| 477 | + # If we've filtered out all the extremities, fall back to |
| 478 | + # sending the original event. This should ensure that the |
| 479 | + # server gets at least some of missed events (especially if |
| 480 | + # the other sending servers are up). |
| 481 | + if new_pdus: |
| 482 | + room_catchup_pdus = new_pdus |
| 483 | + |
| 484 | + logger.info( |
| 485 | + "Catching up rooms to %s: %r", self._destination, pdu.room_id |
| 486 | + ) |
| 487 | + |
| 488 | + await self._transaction_manager.send_new_transaction( |
| 489 | + self._destination, room_catchup_pdus, [] |
| 490 | + ) |
| 491 | + |
| 492 | + sent_transactions_counter.inc() |
| 493 | + |
| 494 | + # We pulled this from the DB, so it'll be non-null |
| 495 | + assert pdu.internal_metadata.stream_ordering |
| 496 | + |
| 497 | + # Note that we mark the last successful stream ordering as that |
| 498 | + # from the *original* PDU, rather than the PDU(s) we actually |
| 499 | + # send. This is because we use it to mark our position in the |
| 500 | + # queue of missed PDUs to process. |
| 501 | + self._last_successful_stream_ordering = ( |
| 502 | + pdu.internal_metadata.stream_ordering |
| 503 | + ) |
| 504 | + |
| 505 | + await self._store.set_destination_last_successful_stream_ordering( |
| 506 | + self._destination, self._last_successful_stream_ordering |
| 507 | + ) |
434 | 508 |
|
435 | 509 | def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
|
436 | 510 | if not self._pending_rrs:
|
|
0 commit comments