15
15
16
16
import logging
17
17
from collections import namedtuple
18
- from typing import Callable , Iterable , List , TypeVar
18
+ from typing import (
19
+ Awaitable ,
20
+ Callable ,
21
+ Dict ,
22
+ Iterable ,
23
+ List ,
24
+ Optional ,
25
+ Set ,
26
+ Tuple ,
27
+ TypeVar ,
28
+ )
19
29
20
30
from prometheus_client import Counter
21
31
24
34
import synapse .server
25
35
from synapse .api .constants import EventTypes , Membership
26
36
from synapse .api .errors import AuthError
37
+ from synapse .events import EventBase
27
38
from synapse .handlers .presence import format_user_presence_state
28
39
from synapse .logging .context import PreserveLoggingContext
29
40
from synapse .logging .utils import log_function
30
41
from synapse .metrics import LaterGauge
31
42
from synapse .metrics .background_process_metrics import run_as_background_process
32
- from synapse .types import StreamToken
43
+ from synapse .streams .config import PaginationConfig
44
+ from synapse .types import Collection , StreamToken , UserID
33
45
from synapse .util .async_helpers import ObservableDeferred , timeout_deferred
34
46
from synapse .util .metrics import Measure
35
47
from synapse .visibility import filter_events_for_client
@@ -77,7 +89,13 @@ class _NotifierUserStream(object):
77
89
so that it can remove itself from the indexes in the Notifier class.
78
90
"""
79
91
80
- def __init__ (self , user_id , rooms , current_token , time_now_ms ):
92
+ def __init__ (
93
+ self ,
94
+ user_id : str ,
95
+ rooms : Collection [str ],
96
+ current_token : StreamToken ,
97
+ time_now_ms : int ,
98
+ ):
81
99
self .user_id = user_id
82
100
self .rooms = set (rooms )
83
101
self .current_token = current_token
@@ -93,13 +111,13 @@ def __init__(self, user_id, rooms, current_token, time_now_ms):
93
111
with PreserveLoggingContext ():
94
112
self .notify_deferred = ObservableDeferred (defer .Deferred ())
95
113
96
- def notify (self , stream_key , stream_id , time_now_ms ):
114
+ def notify (self , stream_key : str , stream_id : int , time_now_ms : int ):
97
115
"""Notify any listeners for this user of a new event from an
98
116
event source.
99
117
Args:
100
- stream_key(str) : The stream the event came from.
101
- stream_id(str) : The new id for the stream the event came from.
102
- time_now_ms(int) : The current time in milliseconds.
118
+ stream_key: The stream the event came from.
119
+ stream_id: The new id for the stream the event came from.
120
+ time_now_ms: The current time in milliseconds.
103
121
"""
104
122
self .current_token = self .current_token .copy_and_advance (stream_key , stream_id )
105
123
self .last_notified_token = self .current_token
@@ -112,7 +130,7 @@ def notify(self, stream_key, stream_id, time_now_ms):
112
130
self .notify_deferred = ObservableDeferred (defer .Deferred ())
113
131
noify_deferred .callback (self .current_token )
114
132
115
- def remove (self , notifier ):
133
+ def remove (self , notifier : "Notifier" ):
116
134
""" Remove this listener from all the indexes in the Notifier
117
135
it knows about.
118
136
"""
@@ -123,10 +141,10 @@ def remove(self, notifier):
123
141
124
142
notifier .user_to_user_stream .pop (self .user_id )
125
143
126
- def count_listeners (self ):
144
+ def count_listeners (self ) -> int :
127
145
return len (self .notify_deferred .observers ())
128
146
129
- def new_listener (self , token ) :
147
+ def new_listener (self , token : StreamToken ) -> _NotificationListener :
130
148
"""Returns a deferred that is resolved when there is a new token
131
149
greater than the given token.
132
150
@@ -159,14 +177,16 @@ class Notifier(object):
159
177
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
160
178
161
179
def __init__ (self , hs : "synapse.server.HomeServer" ):
162
- self .user_to_user_stream = {}
163
- self .room_to_user_streams = {}
180
+ self .user_to_user_stream = {} # type: Dict[str, _NotifierUserStream]
181
+ self .room_to_user_streams = {} # type: Dict[str, Set[_NotifierUserStream]]
164
182
165
183
self .hs = hs
166
184
self .storage = hs .get_storage ()
167
185
self .event_sources = hs .get_event_sources ()
168
186
self .store = hs .get_datastore ()
169
- self .pending_new_room_events = []
187
+ self .pending_new_room_events = (
188
+ []
189
+ ) # type: List[Tuple[int, EventBase, Collection[str]]]
170
190
171
191
# Called when there are new things to stream over replication
172
192
self .replication_callbacks = [] # type: List[Callable[[], None]]
@@ -178,10 +198,9 @@ def __init__(self, hs: "synapse.server.HomeServer"):
178
198
self .clock = hs .get_clock ()
179
199
self .appservice_handler = hs .get_application_service_handler ()
180
200
201
+ self .federation_sender = None
181
202
if hs .should_send_federation ():
182
203
self .federation_sender = hs .get_federation_sender ()
183
- else :
184
- self .federation_sender = None
185
204
186
205
self .state_handler = hs .get_state_handler ()
187
206
@@ -193,12 +212,12 @@ def __init__(self, hs: "synapse.server.HomeServer"):
193
212
# when rendering the metrics page, which is likely once per minute at
194
213
# most when scraping it.
195
214
def count_listeners ():
196
- all_user_streams = set ()
215
+ all_user_streams = set () # type: Set[_NotifierUserStream]
197
216
198
- for x in list (self .room_to_user_streams .values ()):
199
- all_user_streams |= x
200
- for x in list (self .user_to_user_stream .values ()):
201
- all_user_streams .add (x )
217
+ for streams in list (self .room_to_user_streams .values ()):
218
+ all_user_streams |= streams
219
+ for stream in list (self .user_to_user_stream .values ()):
220
+ all_user_streams .add (stream )
202
221
203
222
return sum (stream .count_listeners () for stream in all_user_streams )
204
223
@@ -223,7 +242,11 @@ def add_replication_callback(self, cb: Callable[[], None]):
223
242
self .replication_callbacks .append (cb )
224
243
225
244
def on_new_room_event (
226
- self , event , room_stream_id , max_room_stream_id , extra_users = []
245
+ self ,
246
+ event : EventBase ,
247
+ room_stream_id : int ,
248
+ max_room_stream_id : int ,
249
+ extra_users : Collection [str ] = [],
227
250
):
228
251
""" Used by handlers to inform the notifier something has happened
229
252
in the room, room event wise.
@@ -241,11 +264,11 @@ def on_new_room_event(
241
264
242
265
self .notify_replication ()
243
266
244
- def _notify_pending_new_room_events (self , max_room_stream_id ):
267
+ def _notify_pending_new_room_events (self , max_room_stream_id : int ):
245
268
"""Notify for the room events that were queued waiting for a previous
246
269
event to be persisted.
247
270
Args:
248
- max_room_stream_id(int) : The highest stream_id below which all
271
+ max_room_stream_id: The highest stream_id below which all
249
272
events have been persisted.
250
273
"""
251
274
pending = self .pending_new_room_events
@@ -258,7 +281,9 @@ def _notify_pending_new_room_events(self, max_room_stream_id):
258
281
else :
259
282
self ._on_new_room_event (event , room_stream_id , extra_users )
260
283
261
- def _on_new_room_event (self , event , room_stream_id , extra_users = []):
284
+ def _on_new_room_event (
285
+ self , event : EventBase , room_stream_id : int , extra_users : Collection [str ] = []
286
+ ):
262
287
"""Notify any user streams that are interested in this room event"""
263
288
# poke any interested application service.
264
289
run_as_background_process (
@@ -275,13 +300,19 @@ def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
275
300
"room_key" , room_stream_id , users = extra_users , rooms = [event .room_id ]
276
301
)
277
302
278
- async def _notify_app_services (self , room_stream_id ):
303
+ async def _notify_app_services (self , room_stream_id : int ):
279
304
try :
280
305
await self .appservice_handler .notify_interested_services (room_stream_id )
281
306
except Exception :
282
307
logger .exception ("Error notifying application services of event" )
283
308
284
- def on_new_event (self , stream_key , new_token , users = [], rooms = []):
309
+ def on_new_event (
310
+ self ,
311
+ stream_key : str ,
312
+ new_token : int ,
313
+ users : Collection [str ] = [],
314
+ rooms : Collection [str ] = [],
315
+ ):
285
316
""" Used to inform listeners that something has happened event wise.
286
317
287
318
Will wake up all listeners for the given users and rooms.
@@ -307,14 +338,19 @@ def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
307
338
308
339
self .notify_replication ()
309
340
310
- def on_new_replication_data (self ):
341
+ def on_new_replication_data (self ) -> None :
311
342
"""Used to inform replication listeners that something has happend
312
343
without waking up any of the normal user event streams"""
313
344
self .notify_replication ()
314
345
315
346
async def wait_for_events (
316
- self , user_id , timeout , callback , room_ids = None , from_token = StreamToken .START
317
- ):
347
+ self ,
348
+ user_id : str ,
349
+ timeout : int ,
350
+ callback : Callable [[StreamToken , StreamToken ], Awaitable [T ]],
351
+ room_ids = None ,
352
+ from_token = StreamToken .START ,
353
+ ) -> T :
318
354
"""Wait until the callback returns a non empty response or the
319
355
timeout fires.
320
356
"""
@@ -377,19 +413,16 @@ async def wait_for_events(
377
413
378
414
async def get_events_for (
379
415
self ,
380
- user ,
381
- pagination_config ,
382
- timeout ,
383
- only_keys = None ,
384
- is_guest = False ,
385
- explicit_room_id = None ,
386
- ):
416
+ user : UserID ,
417
+ pagination_config : PaginationConfig ,
418
+ timeout : int ,
419
+ is_guest : bool = False ,
420
+ explicit_room_id : str = None ,
421
+ ) -> EventStreamResult :
387
422
""" For the given user and rooms, return any new events for them. If
388
423
there are no new events wait for up to `timeout` milliseconds for any
389
424
new events to happen before returning.
390
425
391
- If `only_keys` is not None, events from keys will be sent down.
392
-
393
426
If explicit_room_id is not set, the user's joined rooms will be polled
394
427
for events.
395
428
If explicit_room_id is set, that room will be polled for events only if
@@ -404,11 +437,13 @@ async def get_events_for(
404
437
room_ids , is_joined = await self ._get_room_ids (user , explicit_room_id )
405
438
is_peeking = not is_joined
406
439
407
- async def check_for_updates (before_token , after_token ):
440
+ async def check_for_updates (
441
+ before_token : StreamToken , after_token : StreamToken
442
+ ) -> EventStreamResult :
408
443
if not after_token .is_after (before_token ):
409
444
return EventStreamResult ([], (from_token , from_token ))
410
445
411
- events = []
446
+ events = [] # type: List[EventBase]
412
447
end_token = from_token
413
448
414
449
for name , source in self .event_sources .sources .items ():
@@ -417,8 +452,6 @@ async def check_for_updates(before_token, after_token):
417
452
after_id = getattr (after_token , keyname )
418
453
if before_id == after_id :
419
454
continue
420
- if only_keys and name not in only_keys :
421
- continue
422
455
423
456
new_events , new_key = await source .get_new_events (
424
457
user = user ,
@@ -476,7 +509,9 @@ async def check_for_updates(before_token, after_token):
476
509
477
510
return result
478
511
479
- async def _get_room_ids (self , user , explicit_room_id ):
512
+ async def _get_room_ids (
513
+ self , user : UserID , explicit_room_id : Optional [str ]
514
+ ) -> Tuple [Collection [str ], bool ]:
480
515
joined_room_ids = await self .store .get_rooms_for_user (user .to_string ())
481
516
if explicit_room_id :
482
517
if explicit_room_id in joined_room_ids :
@@ -486,7 +521,7 @@ async def _get_room_ids(self, user, explicit_room_id):
486
521
raise AuthError (403 , "Non-joined access not allowed" )
487
522
return joined_room_ids , True
488
523
489
- async def _is_world_readable (self , room_id ) :
524
+ async def _is_world_readable (self , room_id : str ) -> bool :
490
525
state = await self .state_handler .get_current_state (
491
526
room_id , EventTypes .RoomHistoryVisibility , ""
492
527
)
@@ -496,7 +531,7 @@ async def _is_world_readable(self, room_id):
496
531
return False
497
532
498
533
@log_function
499
- def remove_expired_streams (self ):
534
+ def remove_expired_streams (self ) -> None :
500
535
time_now_ms = self .clock .time_msec ()
501
536
expired_streams = []
502
537
expire_before_ts = time_now_ms - self .UNUSED_STREAM_EXPIRY_MS
@@ -510,21 +545,21 @@ def remove_expired_streams(self):
510
545
expired_stream .remove (self )
511
546
512
547
@log_function
513
- def _register_with_keys (self , user_stream ):
548
+ def _register_with_keys (self , user_stream : _NotifierUserStream ):
514
549
self .user_to_user_stream [user_stream .user_id ] = user_stream
515
550
516
551
for room in user_stream .rooms :
517
552
s = self .room_to_user_streams .setdefault (room , set ())
518
553
s .add (user_stream )
519
554
520
- def _user_joined_room (self , user_id , room_id ):
555
+ def _user_joined_room (self , user_id : str , room_id : str ):
521
556
new_user_stream = self .user_to_user_stream .get (user_id )
522
557
if new_user_stream is not None :
523
558
room_streams = self .room_to_user_streams .setdefault (room_id , set ())
524
559
room_streams .add (new_user_stream )
525
560
new_user_stream .rooms .add (room_id )
526
561
527
- def notify_replication (self ):
562
+ def notify_replication (self ) -> None :
528
563
"""Notify the any replication listeners that there's a new event"""
529
564
for cb in self .replication_callbacks :
530
565
cb ()
0 commit comments