From bbb9ae33099c75e7322f2b981d7aedf9cad2b973 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Nov 2021 18:23:45 -0500 Subject: [PATCH 1/4] Split out federated PDU retrieval into a non-cached version Context: https://github.com/matrix-org/synapse/pull/11114/files#r741643968 --- synapse/federation/federation_client.py | 80 ++++++++++++++++++------- 1 file changed, 58 insertions(+), 22 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 670186f5482f..9e12530db888 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -277,6 +277,58 @@ async def backfill( return pdus + async def get_pdu_from_destination_raw( + self, + destination: str, + event_id: str, + room_version: RoomVersion, + outlier: bool = False, + timeout: Optional[int] = None, + ) -> Optional[EventBase]: + """Requests the PDU with given origin and ID from the remote home + server. + Does not have any caching or rate limiting! + Args: + destination: Which homeserver to query + event_id: event to fetch + room_version: version of the room + outlier: Indicates whether the PDU is an `outlier`, i.e. if + it's from an arbitrary point in the context as opposed to part + of the current block of PDUs. Defaults to `False` + timeout: How long to try (in ms) each destination for before + moving to the next destination. None indicates no timeout. + Returns: + The requested PDU, or None if we were unable to find it. + Raises: + SynapseError, NotRetryingDestination, FederationDeniedError + """ + + signed_pdu = None + + transaction_data = await self.transport_layer.get_event( + destination, event_id, timeout=timeout + ) + + logger.info( + "retrieved event id %s from %s: %r", + event_id, + destination, + transaction_data, + ) + + pdu_list: List[EventBase] = [ + event_from_pdu_json(p, room_version, outlier=outlier) + for p in transaction_data["pdus"] + ] + + if pdu_list and pdu_list[0]: + pdu = pdu_list[0] + + # Check signatures are correct. + signed_pdu = await self._check_sigs_and_hash(room_version, pdu) + + return signed_pdu + async def get_pdu( self, destinations: Iterable[str], @@ -321,30 +373,14 @@ async def get_pdu( continue try: - transaction_data = await self.transport_layer.get_event( - destination, event_id, timeout=timeout - ) - - logger.debug( - "retrieved event id %s from %s: %r", - event_id, - destination, - transaction_data, + signed_pdu = await self.get_pdu_from_destination_raw( + destination=destination, + event_id=event_id, + room_version=room_version, + outlier=outlier, + timeout=timeout, ) - pdu_list: List[EventBase] = [ - event_from_pdu_json(p, room_version, outlier=outlier) - for p in transaction_data["pdus"] - ] - - if pdu_list and pdu_list[0]: - pdu = pdu_list[0] - - # Check signatures are correct. - signed_pdu = await self._check_sigs_and_hash(room_version, pdu) - - break - pdu_attempts[destination] = now except SynapseError as e: From 9a8b14995305466d2a7c01b9c153288654a4d13b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Nov 2021 18:29:09 -0500 Subject: [PATCH 2/4] Add changelog --- changelog.d/11242.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11242.misc diff --git a/changelog.d/11242.misc b/changelog.d/11242.misc new file mode 100644 index 000000000000..3a98259edf46 --- /dev/null +++ b/changelog.d/11242.misc @@ -0,0 +1 @@ +Split out federated PDU retrieval function into a non-cached version. From a624d203505c8b68f5b825ca76e01c1054e21748 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Nov 2021 18:59:58 -0500 Subject: [PATCH 3/4] Fix comment doc spacing --- synapse/federation/federation_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9e12530db888..6fc4efc6dee2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -286,8 +286,8 @@ async def get_pdu_from_destination_raw( timeout: Optional[int] = None, ) -> Optional[EventBase]: """Requests the PDU with given origin and ID from the remote home - server. - Does not have any caching or rate limiting! + server. Does not have any caching or rate limiting! + Args: destination: Which homeserver to query event_id: event to fetch @@ -297,8 +297,10 @@ async def get_pdu_from_destination_raw( of the current block of PDUs. Defaults to `False` timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. + Returns: The requested PDU, or None if we were unable to find it. + Raises: SynapseError, NotRetryingDestination, FederationDeniedError """ From 0133785bf4a0a1804dbaa27e2df8946eb5263e1d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Nov 2021 21:43:38 -0600 Subject: [PATCH 4/4] Prefer early-return and straight-forward assignment --- synapse/federation/federation_client.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6fc4efc6dee2..3b85b135e0d3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -304,14 +304,11 @@ async def get_pdu_from_destination_raw( Raises: SynapseError, NotRetryingDestination, FederationDeniedError """ - - signed_pdu = None - transaction_data = await self.transport_layer.get_event( destination, event_id, timeout=timeout ) - logger.info( + logger.debug( "retrieved event id %s from %s: %r", event_id, destination, @@ -328,8 +325,9 @@ async def get_pdu_from_destination_raw( # Check signatures are correct. signed_pdu = await self._check_sigs_and_hash(room_version, pdu) + return signed_pdu - return signed_pdu + return None async def get_pdu( self,