@@ -358,6 +358,10 @@ async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, in
358
358
This function is intentionally not cached because it is called to calculate the
359
359
unread badge for push notifications and thus the result is expected to change.
360
360
361
+ Note that this function assumes the user is a member of the room. Because
362
+ summary rows are not removed when a user leaves a room, the caller must
363
+ filter out those results from the result.
364
+
361
365
Returns:
362
366
A map of room ID to notification counts for the given user.
363
367
"""
@@ -370,170 +374,127 @@ async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, in
370
374
def _get_unread_counts_by_room_for_user_txn (
371
375
self , txn : LoggingTransaction , user_id : str
372
376
) -> Dict [str , int ]:
373
- # To get the badge count of all rooms we need to make three queries:
374
- # 1. Fetch all counts from `event_push_summary`, discarding any stale
375
- # rooms.
376
- # 2. Fetch all notifications from `event_push_actions` that haven't
377
- # been rotated yet.
378
- # 3. Fetch all notifications from `event_push_actions` for the stale
379
- # rooms.
380
- #
381
- # The "stale room" scenario generally happens when there is a new read
382
- # receipt that hasn't yet been processed to update the
383
- # `event_push_summary` table. When that happens we ignore the
384
- # `event_push_summary` table for that room and calculate the count
385
- # manually from `event_push_actions`.
386
-
387
- # We need to only take into account read receipts of these types.
388
- receipt_types_clause , receipt_types_args = make_in_list_sql_clause (
377
+ receipt_types_clause , args = make_in_list_sql_clause (
389
378
self .database_engine ,
390
379
"receipt_type" ,
391
380
(ReceiptTypes .READ , ReceiptTypes .READ_PRIVATE ),
392
381
)
382
+ args .extend ([user_id , user_id ])
383
+
384
+ receipts_cte = f"""
385
+ WITH all_receipts AS (
386
+ SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
387
+ FROM receipts_linearized
388
+ LEFT JOIN events USING (room_id, event_id)
389
+ WHERE
390
+ { receipt_types_clause }
391
+ AND user_id = ?
392
+ GROUP BY room_id, thread_id
393
+ )
394
+ """
395
+
396
+ receipts_joins = """
397
+ LEFT JOIN (
398
+ SELECT room_id, thread_id,
399
+ max_receipt_stream_ordering AS threaded_receipt_stream_ordering
400
+ FROM all_receipts
401
+ WHERE thread_id IS NOT NULL
402
+ ) AS threaded_receipts USING (room_id, thread_id)
403
+ LEFT JOIN (
404
+ SELECT room_id, thread_id,
405
+ max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering
406
+ FROM all_receipts
407
+ WHERE thread_id IS NULL
408
+ ) AS unthreaded_receipts USING (room_id)
409
+ """
410
+
411
+ # First get summary counts by room / thread for the user. We use the max receipt
412
+ # stream ordering of both threaded & unthreaded receipts to compare against the
413
+ # summary table.
414
+ #
415
+ # PostgreSQL and SQLite differ in comparing scalar numerics.
416
+ if isinstance (self .database_engine , PostgresEngine ):
417
+ # GREATEST ignores NULLs.
418
+ max_clause = """GREATEST(
419
+ threaded_receipt_stream_ordering,
420
+ unthreaded_receipt_stream_ordering
421
+ )"""
422
+ else :
423
+ # MAX returns NULL if any are NULL, so COALESCE to 0 first.
424
+ max_clause = """MAX(
425
+ COALESCE(threaded_receipt_stream_ordering, 0),
426
+ COALESCE(unthreaded_receipt_stream_ordering, 0)
427
+ )"""
393
428
394
- # Step 1, fetch all counts from `event_push_summary` for the user. This
395
- # is slightly convoluted as we also need to pull out the stream ordering
396
- # of the most recent receipt of the user in the room (either a thread
397
- # aware receipt or thread unaware receipt) in order to determine
398
- # whether the row in `event_push_summary` is stale. Hence the outer
399
- # GROUP BY and odd join condition against `receipts_linearized`.
400
429
sql = f"""
401
- SELECT room_id, notif_count, stream_ordering, thread_id, last_receipt_stream_ordering,
402
- MAX(receipt_stream_ordering)
403
- FROM (
404
- SELECT e.room_id, notif_count, e.stream_ordering, e.thread_id, last_receipt_stream_ordering,
405
- ev.stream_ordering AS receipt_stream_ordering
406
- FROM event_push_summary AS e
407
- INNER JOIN local_current_membership USING (user_id, room_id)
408
- LEFT JOIN receipts_linearized AS r ON (
409
- e.user_id = r.user_id
410
- AND e.room_id = r.room_id
411
- AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
412
- AND { receipt_types_clause }
430
+ { receipts_cte }
431
+ SELECT eps.room_id, eps.thread_id, notif_count
432
+ FROM event_push_summary AS eps
433
+ { receipts_joins }
434
+ WHERE user_id = ?
435
+ AND notif_count != 0
436
+ AND (
437
+ (last_receipt_stream_ordering IS NULL AND stream_ordering > { max_clause } )
438
+ OR last_receipt_stream_ordering = { max_clause }
413
439
)
414
- LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
415
- WHERE e.user_id = ? and notif_count > 0
416
- ) AS es
417
- GROUP BY room_id, notif_count, stream_ordering, thread_id, last_receipt_stream_ordering
418
440
"""
441
+ txn .execute (sql , args )
419
442
420
- txn .execute (
421
- sql ,
422
- receipt_types_args
423
- + [
424
- user_id ,
425
- ],
426
- )
427
-
443
+ seen_thread_ids = set ()
428
444
room_to_count : Dict [str , int ] = defaultdict (int )
429
- stale_room_ids = set ()
430
- for row in txn :
431
- room_id = row [0 ]
432
- notif_count = row [1 ]
433
- stream_ordering = row [2 ]
434
- _thread_id = row [3 ]
435
- last_receipt_stream_ordering = row [4 ]
436
- receipt_stream_ordering = row [5 ]
437
-
438
- if last_receipt_stream_ordering is None :
439
- if receipt_stream_ordering is None :
440
- room_to_count [room_id ] += notif_count
441
- elif stream_ordering > receipt_stream_ordering :
442
- room_to_count [room_id ] += notif_count
443
- else :
444
- # The latest read receipt from the user is after all the rows for
445
- # this room in `event_push_summary`. We ignore them, and
446
- # calculate the count from `event_push_actions` in step 3.
447
- pass
448
- elif last_receipt_stream_ordering == receipt_stream_ordering :
449
- room_to_count [room_id ] += notif_count
450
- else :
451
- # The row is stale if `last_receipt_stream_ordering` is set and
452
- # *doesn't* match the latest receipt from the user.
453
- stale_room_ids .add (room_id )
454
445
455
- # Discard any stale rooms from `room_to_count`, as we will recalculate
456
- # them in step 3.
457
- for room_id in stale_room_ids :
458
- room_to_count .pop (room_id , None )
446
+ for room_id , thread_id , notif_count in txn :
447
+ room_to_count [room_id ] += notif_count
448
+ seen_thread_ids .add (thread_id )
459
449
460
- # Step 2, basically the same query, except against `event_push_actions`
461
- # and only fetching rows inserted since the last rotation.
462
- rotated_upto_stream_ordering = self .db_pool .simple_select_one_onecol_txn (
463
- txn ,
464
- table = "event_push_summary_stream_ordering" ,
465
- keyvalues = {},
466
- retcol = "stream_ordering" ,
450
+ # Now get any event push actions that haven't been rotated using the same OR
451
+ # join and filter by receipt and event push summary rotated up to stream ordering.
452
+ sql = f"""
453
+ { receipts_cte }
454
+ SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
455
+ FROM event_push_actions AS epa
456
+ { receipts_joins }
457
+ WHERE user_id = ?
458
+ AND epa.notif = 1
459
+ AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering)
460
+ AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
461
+ AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
462
+ GROUP BY epa.room_id, epa.thread_id
463
+ """
464
+ txn .execute (sql , args )
465
+
466
+ for room_id , thread_id , notif_count in txn :
467
+ # Note: only count push actions we have valid summaries for with up to date receipt.
468
+ if thread_id not in seen_thread_ids :
469
+ continue
470
+ room_to_count [room_id ] += notif_count
471
+
472
+ thread_id_clause , thread_ids_args = make_in_list_sql_clause (
473
+ self .database_engine , "epa.thread_id" , seen_thread_ids
467
474
)
468
475
476
+ # Finally re-check event_push_actions for any rooms not in the summary, ignoring
477
+ # the rotated up-to position. This handles the case where a read receipt has arrived
478
+ # but not been rotated meaning the summary table is out of date, so we go back to
479
+ # the push actions table.
469
480
sql = f"""
470
- SELECT room_id, thread_id
471
- FROM (
472
- SELECT e.room_id, e.stream_ordering, e.thread_id,
473
- ev.stream_ordering AS receipt_stream_ordering
474
- FROM event_push_actions AS e
475
- INNER JOIN local_current_membership USING (user_id, room_id)
476
- LEFT JOIN receipts_linearized AS r ON (
477
- e.user_id = r.user_id
478
- AND e.room_id = r.room_id
479
- AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
480
- AND { receipt_types_clause }
481
- )
482
- LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
483
- WHERE e.user_id = ? and notif > 0
484
- AND e.stream_ordering > ?
485
- ) AS es
486
- GROUP BY room_id, stream_ordering, thread_id
487
- HAVING stream_ordering > COALESCE(MAX(receipt_stream_ordering), 0)
481
+ { receipts_cte }
482
+ SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
483
+ FROM event_push_actions AS epa
484
+ { receipts_joins }
485
+ WHERE user_id = ?
486
+ AND NOT { thread_id_clause }
487
+ AND epa.notif = 1
488
+ AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
489
+ AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
490
+ GROUP BY epa.room_id
488
491
"""
489
492
490
- txn .execute (
491
- sql ,
492
- receipt_types_args + [user_id , rotated_upto_stream_ordering ],
493
- )
494
- for room_id , _thread_id in txn :
495
- # Again, we ignore any stale rooms.
496
- if room_id not in stale_room_ids :
497
- # For event push actions it is one notification per row.
498
- room_to_count [room_id ] += 1
499
-
500
- # Step 3, if we have stale rooms then we need to recalculate the counts
501
- # from `event_push_actions`. Again, this is basically the same query as
502
- # above except without a lower bound on stream ordering and only against
503
- # a specific set of rooms.
504
- if stale_room_ids :
505
- room_id_clause , room_id_args = make_in_list_sql_clause (
506
- self .database_engine ,
507
- "e.room_id" ,
508
- stale_room_ids ,
509
- )
493
+ args .extend (thread_ids_args )
494
+ txn .execute (sql , args )
510
495
511
- sql = f"""
512
- SELECT room_id, thread_id
513
- FROM (
514
- SELECT e.room_id, e.stream_ordering, e.thread_id,
515
- ev.stream_ordering AS receipt_stream_ordering
516
- FROM event_push_actions AS e
517
- INNER JOIN local_current_membership USING (user_id, room_id)
518
- LEFT JOIN receipts_linearized AS r ON (
519
- e.user_id = r.user_id
520
- AND e.room_id = r.room_id
521
- AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
522
- AND { receipt_types_clause }
523
- )
524
- LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
525
- WHERE e.user_id = ? and notif > 0
526
- AND { room_id_clause }
527
- ) AS es
528
- GROUP BY room_id, stream_ordering, thread_id
529
- HAVING stream_ordering > COALESCE(MAX(receipt_stream_ordering), 0)
530
- """
531
- txn .execute (
532
- sql ,
533
- receipt_types_args + [user_id ] + room_id_args ,
534
- )
535
- for room_id , _ in txn :
536
- room_to_count [room_id ] += 1
496
+ for room_id , notif_count in txn :
497
+ room_to_count [room_id ] += notif_count
537
498
538
499
return room_to_count
539
500
0 commit comments