@@ -314,11 +314,12 @@ def get_chain_id_txn(txn: Cursor) -> int:
314
314
db_conn , "un_partial_stated_event_stream" , "stream_id"
315
315
)
316
316
317
- def get_un_partial_stated_events_token (self ) -> int :
318
- # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple
319
- # writers because workers that don't write often will hold all
320
- # readers up.
321
- return self ._un_partial_stated_events_stream_id_gen .get_current_token ()
317
+ def get_un_partial_stated_events_token (self , instance_name : str ) -> int :
318
+ return (
319
+ self ._un_partial_stated_events_stream_id_gen .get_current_token_for_writer (
320
+ instance_name
321
+ )
322
+ )
322
323
323
324
async def get_un_partial_stated_events_from_stream (
324
325
self , instance_name : str , last_id : int , current_id : int , limit : int
@@ -408,6 +409,11 @@ def process_replication_position(
408
409
self ._stream_id_gen .advance (instance_name , token )
409
410
elif stream_name == BackfillStream .NAME :
410
411
self ._backfill_id_gen .advance (instance_name , - token )
412
+ elif stream_name == UnPartialStatedEventStream .NAME :
413
+ logger .info (
414
+ "Advancing %s token to %s" , UnPartialStatedEventStream .NAME , token
415
+ )
416
+ self ._un_partial_stated_events_stream_id_gen .advance (instance_name , token )
411
417
super ().process_replication_position (stream_name , instance_name , token )
412
418
413
419
async def have_censored_event (self , event_id : str ) -> bool :
0 commit comments