58
58
from synapse .util import json_encoder
59
59
from synapse .util .caches .expiringcache import ExpiringCache
60
60
from synapse .util .caches .stream_change_cache import StreamChangeCache
61
+ from synapse .util .stringutils import parse_and_validate_server_name
61
62
62
63
if TYPE_CHECKING :
63
64
from synapse .server import HomeServer
@@ -964,6 +965,7 @@ def _add_messages_to_local_device_inbox_txn(
964
965
class DeviceInboxBackgroundUpdateStore (SQLBaseStore ):
965
966
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
966
967
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
968
+ CLEANUP_DEVICE_FEDERATION_OUTBOX = "cleanup_device_federation_outbox"
967
969
968
970
def __init__ (
969
971
self ,
@@ -989,6 +991,11 @@ def __init__(
989
991
self ._remove_dead_devices_from_device_inbox ,
990
992
)
991
993
994
+ self .db_pool .updates .register_background_update_handler (
995
+ self .CLEANUP_DEVICE_FEDERATION_OUTBOX ,
996
+ self ._cleanup_device_federation_outbox ,
997
+ )
998
+
992
999
async def _background_drop_index_device_inbox (
993
1000
self , progress : JsonDict , batch_size : int
994
1001
) -> int :
@@ -1080,6 +1087,75 @@ def _remove_dead_devices_from_device_inbox_txn(
1080
1087
1081
1088
return batch_size
1082
1089
1090
+ async def _cleanup_device_federation_outbox (
1091
+ self ,
1092
+ progress : JsonDict ,
1093
+ batch_size : int ,
1094
+ ) -> int :
1095
+ def _cleanup_device_federation_outbox_txn (
1096
+ txn : LoggingTransaction ,
1097
+ ) -> bool :
1098
+ if "max_stream_id" in progress :
1099
+ max_stream_id = progress ["max_stream_id" ]
1100
+ else :
1101
+ txn .execute ("SELECT max(stream_id) FROM device_federation_outbox" )
1102
+ res = cast (Tuple [Optional [int ]], txn .fetchone ())
1103
+ if res [0 ] is None :
1104
+ # this can only happen if the `device_inbox` table is empty, in which
1105
+ # case we have no work to do.
1106
+ return True
1107
+ else :
1108
+ max_stream_id = res [0 ]
1109
+
1110
+ start = progress .get ("stream_id" , 0 )
1111
+ stop = start + batch_size
1112
+
1113
+ sql = """
1114
+ SELECT destination FROM device_federation_outbox
1115
+ WHERE ? < stream_id AND stream_id <= ?
1116
+ """
1117
+
1118
+ txn .execute (sql , (start , stop ))
1119
+
1120
+ destinations = {d for d , in txn }
1121
+ to_remove = set ()
1122
+ for d in destinations :
1123
+ try :
1124
+ parse_and_validate_server_name (d )
1125
+ except ValueError :
1126
+ to_remove .add (d )
1127
+
1128
+ self .db_pool .simple_delete_many_txn (
1129
+ txn ,
1130
+ table = "device_federation_outbox" ,
1131
+ column = "destination" ,
1132
+ values = to_remove ,
1133
+ keyvalues = {},
1134
+ )
1135
+
1136
+ self .db_pool .updates ._background_update_progress_txn (
1137
+ txn ,
1138
+ self .CLEANUP_DEVICE_FEDERATION_OUTBOX ,
1139
+ {
1140
+ "stream_id" : stop ,
1141
+ "max_stream_id" : max_stream_id ,
1142
+ },
1143
+ )
1144
+
1145
+ return stop >= max_stream_id
1146
+
1147
+ finished = await self .db_pool .runInteraction (
1148
+ "_cleanup_device_federation_outbox" ,
1149
+ _cleanup_device_federation_outbox_txn ,
1150
+ )
1151
+
1152
+ if finished :
1153
+ await self .db_pool .updates ._end_background_update (
1154
+ self .CLEANUP_DEVICE_FEDERATION_OUTBOX ,
1155
+ )
1156
+
1157
+ return batch_size
1158
+
1083
1159
1084
1160
class DeviceInboxStore (DeviceInboxWorkerStore , DeviceInboxBackgroundUpdateStore ):
1085
1161
pass
0 commit comments