@@ -1039,6 +1039,34 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
1039
1039
pdu .room_id , room_version , lock , origin , pdu
1040
1040
)
1041
1041
1042
+ async def _get_next_valid_staged_event_for_room (
1043
+ self , room_id : str , room_version : RoomVersion
1044
+ ) -> Optional [Tuple [str , EventBase ]]:
1045
+ """Return the first non-spam event from staging queue.
1046
+ """
1047
+
1048
+ while True :
1049
+ # We need to do this check outside the lock to avoid a race between
1050
+ # a new event being inserted by another instance and it attempting
1051
+ # to acquire the lock.
1052
+ next = await self .store .get_next_staged_event_for_room (
1053
+ room_id , room_version
1054
+ )
1055
+
1056
+ if next is None :
1057
+ return None
1058
+
1059
+ origin , event = next
1060
+
1061
+ if await self ._spam_checker .should_drop_federated_event (event ):
1062
+ logger .warning (
1063
+ "Staged federated event contains spam, dropping %s" ,
1064
+ event .event_id ,
1065
+ )
1066
+ continue
1067
+
1068
+ return next
1069
+
1042
1070
@wrap_as_background_process ("_process_incoming_pdus_in_room_inner" )
1043
1071
async def _process_incoming_pdus_in_room_inner (
1044
1072
self ,
@@ -1116,31 +1144,15 @@ async def _process_incoming_pdus_in_room_inner(
1116
1144
(self ._clock .time_msec () - received_ts ) / 1000
1117
1145
)
1118
1146
1119
- while True :
1120
- # We need to do this check outside the lock to avoid a race between
1121
- # a new event being inserted by another instance and it attempting
1122
- # to acquire the lock.
1123
- next = await self .store .get_next_staged_event_for_room (
1124
- room_id , room_version
1125
- )
1126
-
1127
- if next is None :
1128
- break
1129
-
1130
- origin , event = next
1131
-
1132
- if await self ._spam_checker .should_drop_federated_event (event ):
1133
- logger .warning (
1134
- "Staged federated event contains spam, dropping %s" ,
1135
- event .event_id ,
1136
- )
1137
- continue
1138
-
1139
- break
1147
+ next = await self ._get_next_valid_staged_event_for_room (
1148
+ room_id , room_version
1149
+ )
1140
1150
1141
1151
if not next :
1142
1152
break
1143
1153
1154
+ origin , event = next
1155
+
1144
1156
# Prune the event queue if it's getting large.
1145
1157
#
1146
1158
# We do this *after* handling the first event as the common case is
0 commit comments