From 26f0d591de8c498c55cf05df18fff066b6f3000f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Aug 2020 10:09:02 +0100 Subject: [PATCH 1/4] Add support for get_next_mult --- synapse/storage/util/id_generators.py | 35 +++++++++++++++++++++++++-- synapse/storage/util/sequence.py | 8 +++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 0bf772d4d19b..f3a419ce8ab2 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -16,7 +16,7 @@ import contextlib import threading from collections import deque -from typing import Dict, Set +from typing import Dict, List, Set from typing_extensions import Deque @@ -234,9 +234,12 @@ def _load_current_ids( return current_positions - def _load_next_id_txn(self, txn): + def _load_next_id_txn(self, txn) -> int: return self._sequence_gen.get_next_id_txn(txn) + def _load_next_mult_id_txn(self, txn, n: int) -> List[int]: + return self._sequence_gen.get_next_mult_txn(txn, n) + async def get_next(self): """ Usage: @@ -262,6 +265,34 @@ def manager(): return manager() + async def get_next_mult(self, n: int): + """ + Usage: + with await stream_id_gen.get_next_mult(5) as stream_ids: + # ... persist event ... + """ + next_ids = await self._db.runInteraction( + "_load_next_mult_id", self._load_next_mult_id_txn, n + ) + + # Assert the fetched ID is actually greater than any ID we've already + # seen. If not, then the sequence and table have got out of sync + # somehow. + assert max(self.get_positions().values(), default=0) < min(next_ids) + + with self._lock: + self._unfinished_ids.update(next_ids) + + @contextlib.contextmanager + def manager(): + try: + yield next_ids + finally: + for i in next_ids: + self._mark_id_as_finished(i) + + return manager() + def get_next_txn(self, txn: LoggingTransaction): """ Usage: diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 63dfea422032..f60237ed03a3 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -14,7 +14,7 @@ # limitations under the License. import abc import threading -from typing import Callable, Optional +from typing import Callable, List, Optional from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.types import Cursor @@ -39,6 +39,12 @@ def get_next_id_txn(self, txn: Cursor) -> int: txn.execute("SELECT nextval(?)", (self._sequence_name,)) return txn.fetchone()[0] + def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: + txn.execute( + "SELECT nextval(?) FROM generate_series(1, ?)", (self._sequence_name, n) + ) + return [i for i, in txn] + GetFirstCallbackType = Callable[[Cursor], int] From 0031da36cfe8b2020388aa4c339b8f1a74ef5190 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Aug 2020 15:49:11 +0100 Subject: [PATCH 2/4] add get_persisted_upto_position --- synapse/storage/util/id_generators.py | 63 +++++++++++++++++++++++++++ tests/storage/test_id_generators.py | 36 +++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index f3a419ce8ab2..322277df85e1 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -14,6 +14,7 @@ # limitations under the License. import contextlib +import heapq import threading from collections import deque from typing import Dict, List, Set @@ -210,6 +211,21 @@ def __init__( # should be less than the minimum of this set (if not empty). self._unfinished_ids = set() # type: Set[int] + # We track the max position where we know everything before has been + # persisted. This is done by a) looking at the min across all instances + # and b) noting that if we have seen a run of persisted positions + # without gaps (e.g. 5, 6, 7) then we can skip forward (e.g. to 7). + # + # Note: There is no guarentee that the IDs generated by the sequence + # will be gapless; gaps can form when e.g. a transaction was rolled + # back. This means that sometimes we won't be able to skip forward the + # position even though everything has been persisted. However, since + # gaps should be relatively rare it's still worth while doing this. + self._persisted_upto_position = ( + min(self._current_positions.values()) if self._current_positions else 0 + ) + self._known_persisted_positions = [] # type: List[int] + self._sequence_gen = PostgresSequenceGenerator(sequence_name) def _load_current_ids( @@ -357,3 +373,50 @@ def advance(self, instance_name: str, new_id: int): self._current_positions[instance_name] = max( new_id, self._current_positions.get(instance_name, 0) ) + + self._add_persisted_position(new_id) + + def get_persisted_upto_position(self) -> int: + """Get the max position where all previous positions have been + persisted. + + Note: In the worst case scenario this will be equal to the minimum + position across writers. This means that the returned position here can + lag if one writer doesn't write very often. + """ + + with self._lock: + return self._persisted_upto_position + + def _add_persisted_position(self, new_id: int): + """Record that we have persisted a position. + + This is used to keep the `_current_positions` up to date. + """ + + # We require that the lock is locked by caller + assert self._lock.locked() + + heapq.heappush(self._known_persisted_positions, new_id) + + # We move the current min position up if the minimum current positions + # of all instances is higher (since by definition all positions less + # that that have been persisted). + min_curr = min(self._current_positions.values()) + self._persisted_upto_position = max(min_curr, self._persisted_upto_position) + + # We now iterate through the seen positions, discarding those that are + # less than the current min positions, and incrementing the min position + # if its exactly one greater. + while self._known_persisted_positions: + if self._known_persisted_positions[0] <= self._persisted_upto_position: + heapq.heappop(self._known_persisted_positions) + elif ( + self._known_persisted_positions[0] == self._persisted_upto_position + 1 + ): + heapq.heappop(self._known_persisted_positions) + self._persisted_upto_position += 1 + else: + # There was a gap in seen positions, so there is nothing more to + # do. + break diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index 7a051946534b..f42849ad16a2 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -182,3 +182,39 @@ def _get_next_txn(txn): self.assertEqual(id_gen.get_positions(), {"master": 8}) self.assertEqual(id_gen.get_current_token_for_writer("master"), 8) + + def test_get_persisted_upto_position(self): + """Test that `get_persisted_upto_position` correctly tracks updates to + positions. + """ + + self._insert_rows("first", 3) + self._insert_rows("second", 5) + + id_gen = self._create_id_generator("first") + + # Min is 3 and there is a gap between 5, so we expect it to be 3. + self.assertEqual(id_gen.get_persisted_upto_position(), 3) + + # We advance "first" straight to 6. Min is now 5 but there is no gap so + # we expect it to be 6 + id_gen.advance("first", 6) + self.assertEqual(id_gen.get_persisted_upto_position(), 6) + + # No gap, so we expect 7. + id_gen.advance("second", 7) + self.assertEqual(id_gen.get_persisted_upto_position(), 7) + + # We haven't seen 8 yet, so we expect 7 still. + id_gen.advance("second", 9) + self.assertEqual(id_gen.get_persisted_upto_position(), 7) + + # Now that we've seen 7, 8 and 9 we can got straight to 9. + id_gen.advance("first", 8) + self.assertEqual(id_gen.get_persisted_upto_position(), 9) + + # Jmp forward with gaps. The minimum is 11, even though we haven't seen + # 10 we know that everything before 11 must be persisted. + id_gen.advance("first", 11) + id_gen.advance("second", 15) + self.assertEqual(id_gen.get_persisted_upto_position(), 11) From 55b784b14d395e9768610ac2aac146bc849ee23c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Aug 2020 15:15:20 +0100 Subject: [PATCH 3/4] Newsfile --- changelog.d/8164.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8164.misc diff --git a/changelog.d/8164.misc b/changelog.d/8164.misc new file mode 100644 index 000000000000..55bc079cdba8 --- /dev/null +++ b/changelog.d/8164.misc @@ -0,0 +1 @@ +Add functions to `MultiWriterIdGen` used by events stream. From e296cd327d75a69396a8834fe015719b8d97829f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Aug 2020 16:45:08 +0100 Subject: [PATCH 4/4] Code review --- synapse/storage/util/id_generators.py | 9 +++++++-- synapse/storage/util/sequence.py | 2 +- tests/storage/test_id_generators.py | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 322277df85e1..8088d0173dd1 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -220,7 +220,9 @@ def __init__( # will be gapless; gaps can form when e.g. a transaction was rolled # back. This means that sometimes we won't be able to skip forward the # position even though everything has been persisted. However, since - # gaps should be relatively rare it's still worth while doing this. + # gaps should be relatively rare it's still worth doing the book keeping + # that allows us to skip forwards when there are gapless runs of + # positions. self._persisted_upto_position = ( min(self._current_positions.values()) if self._current_positions else 0 ) @@ -285,7 +287,7 @@ async def get_next_mult(self, n: int): """ Usage: with await stream_id_gen.get_next_mult(5) as stream_ids: - # ... persist event ... + # ... persist events ... """ next_ids = await self._db.runInteraction( "_load_next_mult_id", self._load_next_mult_id_txn, n @@ -408,6 +410,9 @@ def _add_persisted_position(self, new_id: int): # We now iterate through the seen positions, discarding those that are # less than the current min positions, and incrementing the min position # if its exactly one greater. + # + # This is also where we discard items from `_known_persisted_positions` + # (to ensure the list doesn't infinitely grow). while self._known_persisted_positions: if self._known_persisted_positions[0] <= self._persisted_upto_position: heapq.heappop(self._known_persisted_positions) diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index f60237ed03a3..ffc189474890 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -43,7 +43,7 @@ def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: txn.execute( "SELECT nextval(?) FROM generate_series(1, ?)", (self._sequence_name, n) ) - return [i for i, in txn] + return [i for (i,) in txn] GetFirstCallbackType = Callable[[Cursor], int] diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index f42849ad16a2..9b9a183e7f2b 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -213,7 +213,7 @@ def test_get_persisted_upto_position(self): id_gen.advance("first", 8) self.assertEqual(id_gen.get_persisted_upto_position(), 9) - # Jmp forward with gaps. The minimum is 11, even though we haven't seen + # Jump forward with gaps. The minimum is 11, even though we haven't seen # 10 we know that everything before 11 must be persisted. id_gen.advance("first", 11) id_gen.advance("second", 15)