Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 25b077d

Browse files
committed
Merge commit '050e20e7c' into anoa/dinsic_release_1_21_x
* commit '050e20e7c': Convert some of the general database methods to async (#8100)
2 parents 29506f5 + 050e20e commit 25b077d

13 files changed

+79
-58
lines changed

changelog.d/8100.misc

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Convert various parts of the codebase to async/await.

synapse/storage/database.py

+9-14
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,7 @@ def is_running(self):
332332
"""
333333
return self._db_pool.running
334334

335-
@defer.inlineCallbacks
336-
def _check_safe_to_upsert(self):
335+
async def _check_safe_to_upsert(self):
337336
"""
338337
Is it safe to use native UPSERT?
339338
@@ -342,7 +341,7 @@ def _check_safe_to_upsert(self):
342341
343342
If the background updates have not completed, wait 15 sec and check again.
344343
"""
345-
updates = yield self.simple_select_list(
344+
updates = await self.simple_select_list(
346345
"background_updates",
347346
keyvalues=None,
348347
retcols=["update_name"],
@@ -614,8 +613,7 @@ def interaction(txn):
614613
# "Simple" SQL API methods that operate on a single table with no JOINs,
615614
# no complex WHERE clauses, just a dict of values for columns.
616615

617-
@defer.inlineCallbacks
618-
def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
616+
async def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
619617
"""Executes an INSERT query on the named table.
620618
621619
Args:
@@ -631,7 +629,7 @@ def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
631629
`or_ignore` is True
632630
"""
633631
try:
634-
yield self.runInteraction(desc, self.simple_insert_txn, table, values)
632+
await self.runInteraction(desc, self.simple_insert_txn, table, values)
635633
except self.engine.module.IntegrityError:
636634
# We have to do or_ignore flag at this layer, since we can't reuse
637635
# a cursor after we receive an error from the db.
@@ -684,8 +682,7 @@ def simple_insert_many_txn(txn, table, values):
684682

685683
txn.executemany(sql, vals)
686684

687-
@defer.inlineCallbacks
688-
def simple_upsert(
685+
async def simple_upsert(
689686
self,
690687
table,
691688
keyvalues,
@@ -714,14 +711,14 @@ def simple_upsert(
714711
inserting
715712
lock (bool): True to lock the table when doing the upsert.
716713
Returns:
717-
Deferred(None or bool): Native upserts always return None. Emulated
714+
None or bool: Native upserts always return None. Emulated
718715
upserts return True if a new entry was created, False if an existing
719716
one was updated.
720717
"""
721718
attempts = 0
722719
while True:
723720
try:
724-
result = yield self.runInteraction(
721+
return await self.runInteraction(
725722
desc,
726723
self.simple_upsert_txn,
727724
table,
@@ -730,7 +727,6 @@ def simple_upsert(
730727
insertion_values,
731728
lock=lock,
732729
)
733-
return result
734730
except self.engine.module.IntegrityError as e:
735731
attempts += 1
736732
if attempts >= 5:
@@ -1121,8 +1117,7 @@ def simple_select_list_txn(cls, txn, table, keyvalues, retcols):
11211117

11221118
return cls.cursor_to_dict(txn)
11231119

1124-
@defer.inlineCallbacks
1125-
def simple_select_many_batch(
1120+
async def simple_select_many_batch(
11261121
self,
11271122
table,
11281123
column,
@@ -1156,7 +1151,7 @@ def simple_select_many_batch(
11561151
it_list[i : i + batch_size] for i in range(0, len(it_list), batch_size)
11571152
]
11581153
for chunk in chunks:
1159-
rows = yield self.runInteraction(
1154+
rows = await self.runInteraction(
11601155
desc,
11611156
self.simple_select_many_txn,
11621157
table,

synapse/storage/databases/main/appservice.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def set_appservice_state(self, service, state):
169169
service(ApplicationService): The service whose state to set.
170170
state(ApplicationServiceState): The connectivity state to apply.
171171
Returns:
172-
A Deferred which resolves when the state was set successfully.
172+
An Awaitable which resolves when the state was set successfully.
173173
"""
174174
return self.db_pool.simple_upsert(
175175
"application_services_state", {"as_id": service.id}, {"state": state}

synapse/storage/databases/main/events_worker.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -847,13 +847,15 @@ def have_events_in_timeline(self, event_ids):
847847
"""Given a list of event ids, check if we have already processed and
848848
stored them as non outliers.
849849
"""
850-
rows = yield self.db_pool.simple_select_many_batch(
851-
table="events",
852-
retcols=("event_id",),
853-
column="event_id",
854-
iterable=list(event_ids),
855-
keyvalues={"outlier": False},
856-
desc="have_events_in_timeline",
850+
rows = yield defer.ensureDeferred(
851+
self.db_pool.simple_select_many_batch(
852+
table="events",
853+
retcols=("event_id",),
854+
column="event_id",
855+
iterable=list(event_ids),
856+
keyvalues={"outlier": False},
857+
desc="have_events_in_timeline",
858+
)
857859
)
858860

859861
return {r["event_id"] for r in rows}

synapse/storage/databases/main/registration.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
import logging
1919
import re
20-
from typing import Dict, List, Optional
21-
22-
from twisted.internet.defer import Deferred
20+
from typing import Awaitable, Dict, List, Optional
2321

2422
from synapse.api.constants import UserTypes
2523
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
@@ -642,7 +640,7 @@ def add_user_bound_threepid(self, user_id, medium, address, id_server):
642640
id_server (str)
643641
644642
Returns:
645-
Deferred
643+
Awaitable
646644
"""
647645
# We need to use an upsert, in case they user had already bound the
648646
# threepid
@@ -1163,7 +1161,7 @@ def _register_user(
11631161

11641162
def record_user_external_id(
11651163
self, auth_provider: str, external_id: str, user_id: str
1166-
) -> Deferred:
1164+
) -> Awaitable:
11671165
"""Record a mapping from an external user id to a mxid
11681166
11691167
Args:

synapse/storage/databases/main/roommember.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -767,13 +767,13 @@ async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
767767

768768
return set(room_ids)
769769

770-
def get_membership_from_event_ids(
770+
async def get_membership_from_event_ids(
771771
self, member_event_ids: Iterable[str]
772772
) -> List[dict]:
773773
"""Get user_id and membership of a set of event IDs.
774774
"""
775775

776-
return self.db_pool.simple_select_many_batch(
776+
return await self.db_pool.simple_select_many_batch(
777777
table="room_memberships",
778778
column="event_id",
779779
iterable=member_event_ids,

tests/handlers/test_profile.py

+3
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ def register_query_handler(query_type, handler):
6464
self.bob = UserID.from_string("@4567:test")
6565
self.alice = UserID.from_string("@alice:remote")
6666

67+
yield defer.ensureDeferred(self.store.create_profile(self.frank.localpart))
68+
6769
self.handler = hs.get_profile_handler()
6870
self.hs = hs
6971

@@ -155,6 +157,7 @@ def test_get_other_name(self):
155157

156158
@defer.inlineCallbacks
157159
def test_incoming_fed_query(self):
160+
yield defer.ensureDeferred(self.store.create_profile("caroline"))
158161
yield self.store.set_profile_displayname("caroline", "Caroline", 1)
159162

160163
response = yield defer.ensureDeferred(

tests/handlers/test_typing.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def get_users_in_room(room_id):
156156
([], 0)
157157
)
158158
self.datastore.delete_device_msgs_for_remote = lambda *args, **kargs: None
159-
self.datastore.set_received_txn_response = lambda *args, **kwargs: defer.succeed(
159+
self.datastore.set_received_txn_response = lambda *args, **kwargs: make_awaitable(
160160
None
161161
)
162162

tests/storage/test_appservice.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,9 @@ def test_get_appservices_by_state_none(self):
207207
@defer.inlineCallbacks
208208
def test_set_appservices_state_down(self):
209209
service = Mock(id=self.as_list[1]["id"])
210-
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
210+
yield defer.ensureDeferred(
211+
self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
212+
)
211213
rows = yield self.db_pool.runQuery(
212214
self.engine.convert_param_style(
213215
"SELECT as_id FROM application_services_state WHERE state=?"
@@ -219,9 +221,15 @@ def test_set_appservices_state_down(self):
219221
@defer.inlineCallbacks
220222
def test_set_appservices_state_multiple_up(self):
221223
service = Mock(id=self.as_list[1]["id"])
222-
yield self.store.set_appservice_state(service, ApplicationServiceState.UP)
223-
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
224-
yield self.store.set_appservice_state(service, ApplicationServiceState.UP)
224+
yield defer.ensureDeferred(
225+
self.store.set_appservice_state(service, ApplicationServiceState.UP)
226+
)
227+
yield defer.ensureDeferred(
228+
self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
229+
)
230+
yield defer.ensureDeferred(
231+
self.store.set_appservice_state(service, ApplicationServiceState.UP)
232+
)
225233
rows = yield self.db_pool.runQuery(
226234
self.engine.convert_param_style(
227235
"SELECT as_id FROM application_services_state WHERE state=?"

tests/storage/test_base.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ def runWithConnection(func, *args, **kwargs):
6666
def test_insert_1col(self):
6767
self.mock_txn.rowcount = 1
6868

69-
yield self.datastore.db_pool.simple_insert(
70-
table="tablename", values={"columname": "Value"}
69+
yield defer.ensureDeferred(
70+
self.datastore.db_pool.simple_insert(
71+
table="tablename", values={"columname": "Value"}
72+
)
7173
)
7274

7375
self.mock_txn.execute.assert_called_with(
@@ -78,10 +80,12 @@ def test_insert_1col(self):
7880
def test_insert_3cols(self):
7981
self.mock_txn.rowcount = 1
8082

81-
yield self.datastore.db_pool.simple_insert(
82-
table="tablename",
83-
# Use OrderedDict() so we can assert on the SQL generated
84-
values=OrderedDict([("colA", 1), ("colB", 2), ("colC", 3)]),
83+
yield defer.ensureDeferred(
84+
self.datastore.db_pool.simple_insert(
85+
table="tablename",
86+
# Use OrderedDict() so we can assert on the SQL generated
87+
values=OrderedDict([("colA", 1), ("colB", 2), ("colC", 3)]),
88+
)
8589
)
8690

8791
self.mock_txn.execute.assert_called_with(

tests/storage/test_event_push_actions.py

+16-14
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,22 @@ def _mark_read(stream, depth):
142142
@defer.inlineCallbacks
143143
def test_find_first_stream_ordering_after_ts(self):
144144
def add_event(so, ts):
145-
return self.store.db_pool.simple_insert(
146-
"events",
147-
{
148-
"stream_ordering": so,
149-
"received_ts": ts,
150-
"event_id": "event%i" % so,
151-
"type": "",
152-
"room_id": "",
153-
"content": "",
154-
"processed": True,
155-
"outlier": False,
156-
"topological_ordering": 0,
157-
"depth": 0,
158-
},
145+
return defer.ensureDeferred(
146+
self.store.db_pool.simple_insert(
147+
"events",
148+
{
149+
"stream_ordering": so,
150+
"received_ts": ts,
151+
"event_id": "event%i" % so,
152+
"type": "",
153+
"room_id": "",
154+
"content": "",
155+
"processed": True,
156+
"outlier": False,
157+
"topological_ordering": 0,
158+
"depth": 0,
159+
},
160+
)
159161
)
160162

161163
# start with the base case where there are no events in the table

tests/storage/test_main.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def setUp(self):
3535
@defer.inlineCallbacks
3636
def test_get_users_paginate(self):
3737
yield self.store.register_user(self.user.to_string(), "pass")
38-
yield self.store.create_profile(self.user.localpart)
38+
yield defer.ensureDeferred(self.store.create_profile(self.user.localpart))
3939
yield self.store.set_profile_displayname(
4040
self.user.localpart, self.displayname, 1
4141
)

tests/storage/test_profile.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,24 @@ def setUp(self):
3333

3434
@defer.inlineCallbacks
3535
def test_displayname(self):
36-
yield self.store.set_profile_displayname(self.u_frank.localpart, "Frank", 1)
36+
yield defer.ensureDeferred(self.store.create_profile(self.u_frank.localpart))
37+
38+
yield defer.ensureDeferred(
39+
self.store.set_profile_displayname(self.u_frank.localpart, "Frank", 1)
40+
)
3741

3842
self.assertEquals(
3943
"Frank", (yield self.store.get_profile_displayname(self.u_frank.localpart))
4044
)
4145

4246
@defer.inlineCallbacks
4347
def test_avatar_url(self):
44-
yield self.store.set_profile_avatar_url(
45-
self.u_frank.localpart, "http://my.site/here", 1
48+
yield defer.ensureDeferred(self.store.create_profile(self.u_frank.localpart))
49+
50+
yield defer.ensureDeferred(
51+
self.store.set_profile_avatar_url(
52+
self.u_frank.localpart, "http://my.site/here", 1
53+
)
4654
)
4755

4856
self.assertEquals(

0 commit comments

Comments
 (0)