15
15
16
16
import logging
17
17
from inspect import isawaitable
18
- from typing import TYPE_CHECKING , Optional
18
+ from typing import TYPE_CHECKING , Optional , Type , cast
19
19
20
20
import txredisapi
21
21
22
22
from synapse .logging .context import PreserveLoggingContext , make_deferred_yieldable
23
23
from synapse .metrics .background_process_metrics import (
24
24
BackgroundProcessLoggingContext ,
25
25
run_as_background_process ,
26
+ wrap_as_background_process ,
26
27
)
27
28
from synapse .replication .tcp .commands import (
28
29
Command ,
@@ -59,16 +60,16 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
59
60
immediately after initialisation.
60
61
61
62
Attributes:
62
- handler : The command handler to handle incoming commands.
63
- stream_name : The *redis* stream name to subscribe to and publish from
64
- (not anything to do with Synapse replication streams).
65
- outbound_redis_connection : The connection to redis to use to send
63
+ synapse_handler : The command handler to handle incoming commands.
64
+ synapse_stream_name : The *redis* stream name to subscribe to and publish
65
+ from (not anything to do with Synapse replication streams).
66
+ synapse_outbound_redis_connection : The connection to redis to use to send
66
67
commands.
67
68
"""
68
69
69
- handler = None # type: ReplicationCommandHandler
70
- stream_name = None # type: str
71
- outbound_redis_connection = None # type: txredisapi.RedisProtocol
70
+ synapse_handler = None # type: ReplicationCommandHandler
71
+ synapse_stream_name = None # type: str
72
+ synapse_outbound_redis_connection = None # type: txredisapi.RedisProtocol
72
73
73
74
def __init__ (self , * args , ** kwargs ):
74
75
super ().__init__ (* args , ** kwargs )
@@ -88,19 +89,19 @@ async def _send_subscribe(self):
88
89
# it's important to make sure that we only send the REPLICATE command once we
89
90
# have successfully subscribed to the stream - otherwise we might miss the
90
91
# POSITION response sent back by the other end.
91
- logger .info ("Sending redis SUBSCRIBE for %s" , self .stream_name )
92
- await make_deferred_yieldable (self .subscribe (self .stream_name ))
92
+ logger .info ("Sending redis SUBSCRIBE for %s" , self .synapse_stream_name )
93
+ await make_deferred_yieldable (self .subscribe (self .synapse_stream_name ))
93
94
logger .info (
94
95
"Successfully subscribed to redis stream, sending REPLICATE command"
95
96
)
96
- self .handler .new_connection (self )
97
+ self .synapse_handler .new_connection (self )
97
98
await self ._async_send_command (ReplicateCommand ())
98
99
logger .info ("REPLICATE successfully sent" )
99
100
100
101
# We send out our positions when there is a new connection in case the
101
102
# other side missed updates. We do this for Redis connections as the
102
103
# otherside won't know we've connected and so won't issue a REPLICATE.
103
- self .handler .send_positions_to_connection (self )
104
+ self .synapse_handler .send_positions_to_connection (self )
104
105
105
106
def messageReceived (self , pattern : str , channel : str , message : str ):
106
107
"""Received a message from redis.
@@ -137,7 +138,7 @@ def handle_command(self, cmd: Command) -> None:
137
138
cmd: received command
138
139
"""
139
140
140
- cmd_func = getattr (self .handler , "on_%s" % (cmd .NAME ,), None )
141
+ cmd_func = getattr (self .synapse_handler , "on_%s" % (cmd .NAME ,), None )
141
142
if not cmd_func :
142
143
logger .warning ("Unhandled command: %r" , cmd )
143
144
return
@@ -155,7 +156,7 @@ def handle_command(self, cmd: Command) -> None:
155
156
def connectionLost (self , reason ):
156
157
logger .info ("Lost connection to redis" )
157
158
super ().connectionLost (reason )
158
- self .handler .lost_connection (self )
159
+ self .synapse_handler .lost_connection (self )
159
160
160
161
# mark the logging context as finished
161
162
self ._logging_context .__exit__ (None , None , None )
@@ -183,11 +184,54 @@ async def _async_send_command(self, cmd: Command):
183
184
tcp_outbound_commands_counter .labels (cmd .NAME , "redis" ).inc ()
184
185
185
186
await make_deferred_yieldable (
186
- self .outbound_redis_connection .publish (self .stream_name , encoded_string )
187
+ self .synapse_outbound_redis_connection .publish (
188
+ self .synapse_stream_name , encoded_string
189
+ )
190
+ )
191
+
192
+
193
+ class SynapseRedisFactory (txredisapi .RedisFactory ):
194
+ """A subclass of RedisFactory that periodically sends pings to ensure that
195
+ we detect dead connections.
196
+ """
197
+
198
+ def __init__ (
199
+ self ,
200
+ hs : "HomeServer" ,
201
+ uuid : str ,
202
+ dbid : Optional [int ],
203
+ poolsize : int ,
204
+ isLazy : bool = False ,
205
+ handler : Type = txredisapi .ConnectionHandler ,
206
+ charset : str = "utf-8" ,
207
+ password : Optional [str ] = None ,
208
+ replyTimeout : int = 30 ,
209
+ convertNumbers : Optional [int ] = True ,
210
+ ):
211
+ super ().__init__ (
212
+ uuid = uuid ,
213
+ dbid = dbid ,
214
+ poolsize = poolsize ,
215
+ isLazy = isLazy ,
216
+ handler = handler ,
217
+ charset = charset ,
218
+ password = password ,
219
+ replyTimeout = replyTimeout ,
220
+ convertNumbers = convertNumbers ,
187
221
)
188
222
223
+ hs .get_clock ().looping_call (self ._send_ping , 30 * 1000 )
224
+
225
+ @wrap_as_background_process ("redis_ping" )
226
+ async def _send_ping (self ):
227
+ for connection in self .pool :
228
+ try :
229
+ await make_deferred_yieldable (connection .ping ())
230
+ except Exception :
231
+ logger .warning ("Failed to send ping to a redis connection" )
189
232
190
- class RedisDirectTcpReplicationClientFactory (txredisapi .SubscriberFactory ):
233
+
234
+ class RedisDirectTcpReplicationClientFactory (SynapseRedisFactory ):
191
235
"""This is a reconnecting factory that connects to redis and immediately
192
236
subscribes to a stream.
193
237
@@ -206,65 +250,62 @@ def __init__(
206
250
self , hs : "HomeServer" , outbound_redis_connection : txredisapi .RedisProtocol
207
251
):
208
252
209
- super ().__init__ ()
210
-
211
- # This sets the password on the RedisFactory base class (as
212
- # SubscriberFactory constructor doesn't pass it through).
213
- self .password = hs .config .redis .redis_password
253
+ super ().__init__ (
254
+ hs ,
255
+ uuid = "subscriber" ,
256
+ dbid = None ,
257
+ poolsize = 1 ,
258
+ replyTimeout = 30 ,
259
+ password = hs .config .redis .redis_password ,
260
+ )
214
261
215
- self .handler = hs .get_tcp_replication ()
216
- self .stream_name = hs .hostname
262
+ self .synapse_handler = hs .get_tcp_replication ()
263
+ self .synapse_stream_name = hs .hostname
217
264
218
- self .outbound_redis_connection = outbound_redis_connection
265
+ self .synapse_outbound_redis_connection = outbound_redis_connection
219
266
220
267
def buildProtocol (self , addr ):
221
- p = super ().buildProtocol (addr ) # type: RedisSubscriber
268
+ p = super ().buildProtocol (addr )
269
+ p = cast (RedisSubscriber , p )
222
270
223
271
# We do this here rather than add to the constructor of `RedisSubcriber`
224
272
# as to do so would involve overriding `buildProtocol` entirely, however
225
273
# the base method does some other things than just instantiating the
226
274
# protocol.
227
- p .handler = self .handler
228
- p .outbound_redis_connection = self .outbound_redis_connection
229
- p .stream_name = self .stream_name
230
- p .password = self .password
275
+ p .synapse_handler = self .synapse_handler
276
+ p .synapse_outbound_redis_connection = self .synapse_outbound_redis_connection
277
+ p .synapse_stream_name = self .synapse_stream_name
231
278
232
279
return p
233
280
234
281
235
282
def lazyConnection (
236
- reactor ,
283
+ hs : "HomeServer" ,
237
284
host : str = "localhost" ,
238
285
port : int = 6379 ,
239
286
dbid : Optional [int ] = None ,
240
287
reconnect : bool = True ,
241
- charset : str = "utf-8" ,
242
288
password : Optional [str ] = None ,
243
- connectTimeout : Optional [int ] = None ,
244
- replyTimeout : Optional [int ] = None ,
245
- convertNumbers : bool = True ,
289
+ replyTimeout : int = 30 ,
246
290
) -> txredisapi .RedisProtocol :
247
- """Equivalent to `txredisapi.lazyConnection`, except allows specifying a
248
- reactor .
291
+ """Creates a connection to Redis that is lazily set up and reconnects if the
292
+ connections is lost .
249
293
"""
250
294
251
- isLazy = True
252
- poolsize = 1
253
-
254
295
uuid = "%s:%d" % (host , port )
255
- factory = txredisapi .RedisFactory (
256
- uuid ,
257
- dbid ,
258
- poolsize ,
259
- isLazy ,
260
- txredisapi .ConnectionHandler ,
261
- charset ,
262
- password ,
263
- replyTimeout ,
264
- convertNumbers ,
296
+ factory = SynapseRedisFactory (
297
+ hs ,
298
+ uuid = uuid ,
299
+ dbid = dbid ,
300
+ poolsize = 1 ,
301
+ isLazy = True ,
302
+ handler = txredisapi .ConnectionHandler ,
303
+ password = password ,
304
+ replyTimeout = replyTimeout ,
265
305
)
266
306
factory .continueTrying = reconnect
267
- for x in range (poolsize ):
268
- reactor .connectTCP (host , port , factory , connectTimeout )
307
+
308
+ reactor = hs .get_reactor ()
309
+ reactor .connectTCP (host , port , factory , 30 )
269
310
270
311
return factory .handler
0 commit comments