5
5
import uuid
6
6
7
7
import aiopg
8
- from aiopg .sa .engine import APGCompiler_psycopg2
9
- from sqlalchemy .dialects .postgresql .psycopg2 import PGDialect_psycopg2
10
8
from sqlalchemy .engine .cursor import CursorResultMetaData
11
9
from sqlalchemy .engine .interfaces import Dialect , ExecutionContext
12
10
from sqlalchemy .engine .row import Row
13
11
from sqlalchemy .sql import ClauseElement
14
12
from sqlalchemy .sql .ddl import DDLElement
15
13
16
- from databases .core import DatabaseURL
14
+ from databases .backends .common .records import Record , Row , create_column_maps
15
+ from databases .backends .compilers .psycopg import PGCompiler_psycopg
16
+ from databases .backends .dialects .psycopg import PGDialect_psycopg
17
+ from databases .core import LOG_EXTRA , DatabaseURL
17
18
from databases .interfaces import (
18
19
ConnectionBackend ,
19
20
DatabaseBackend ,
20
- Record ,
21
+ Record as RecordInterface ,
21
22
TransactionBackend ,
22
23
)
23
24
@@ -34,10 +35,10 @@ def __init__(
34
35
self ._pool : typing .Union [aiopg .Pool , None ] = None
35
36
36
37
def _get_dialect (self ) -> Dialect :
37
- dialect = PGDialect_psycopg2 (
38
+ dialect = PGDialect_psycopg (
38
39
json_serializer = json .dumps , json_deserializer = lambda x : x
39
40
)
40
- dialect .statement_compiler = APGCompiler_psycopg2
41
+ dialect .statement_compiler = PGCompiler_psycopg
41
42
dialect .implicit_returning = True
42
43
dialect .supports_native_enum = True
43
44
dialect .supports_smallserial = True # 9.2+
@@ -117,50 +118,55 @@ async def release(self) -> None:
117
118
await self ._database ._pool .release (self ._connection )
118
119
self ._connection = None
119
120
120
- async def fetch_all (self , query : ClauseElement ) -> typing .List [Record ]:
121
+ async def fetch_all (self , query : ClauseElement ) -> typing .List [RecordInterface ]:
121
122
assert self ._connection is not None , "Connection is not acquired"
122
- query_str , args , context = self ._compile (query )
123
+ query_str , args , result_columns , context = self ._compile (query )
124
+ column_maps = create_column_maps (result_columns )
125
+ dialect = self ._dialect
126
+
123
127
cursor = await self ._connection .cursor ()
124
128
try :
125
129
await cursor .execute (query_str , args )
126
130
rows = await cursor .fetchall ()
127
131
metadata = CursorResultMetaData (context , cursor .description )
128
- return [
132
+ rows = [
129
133
Row (
130
134
metadata ,
131
135
metadata ._processors ,
132
136
metadata ._keymap ,
133
- Row ._default_key_style ,
134
137
row ,
135
138
)
136
139
for row in rows
137
140
]
141
+ return [Record (row , result_columns , dialect , column_maps ) for row in rows ]
138
142
finally :
139
143
cursor .close ()
140
144
141
- async def fetch_one (self , query : ClauseElement ) -> typing .Optional [Record ]:
145
+ async def fetch_one (self , query : ClauseElement ) -> typing .Optional [RecordInterface ]:
142
146
assert self ._connection is not None , "Connection is not acquired"
143
- query_str , args , context = self ._compile (query )
147
+ query_str , args , result_columns , context = self ._compile (query )
148
+ column_maps = create_column_maps (result_columns )
149
+ dialect = self ._dialect
144
150
cursor = await self ._connection .cursor ()
145
151
try :
146
152
await cursor .execute (query_str , args )
147
153
row = await cursor .fetchone ()
148
154
if row is None :
149
155
return None
150
156
metadata = CursorResultMetaData (context , cursor .description )
151
- return Row (
157
+ row = Row (
152
158
metadata ,
153
159
metadata ._processors ,
154
160
metadata ._keymap ,
155
- Row ._default_key_style ,
156
161
row ,
157
162
)
163
+ return Record (row , result_columns , dialect , column_maps )
158
164
finally :
159
165
cursor .close ()
160
166
161
167
async def execute (self , query : ClauseElement ) -> typing .Any :
162
168
assert self ._connection is not None , "Connection is not acquired"
163
- query_str , args , context = self ._compile (query )
169
+ query_str , args , _ , _ = self ._compile (query )
164
170
cursor = await self ._connection .cursor ()
165
171
try :
166
172
await cursor .execute (query_str , args )
@@ -173,7 +179,7 @@ async def execute_many(self, queries: typing.List[ClauseElement]) -> None:
173
179
cursor = await self ._connection .cursor ()
174
180
try :
175
181
for single_query in queries :
176
- single_query , args , context = self ._compile (single_query )
182
+ single_query , args , _ , _ = self ._compile (single_query )
177
183
await cursor .execute (single_query , args )
178
184
finally :
179
185
cursor .close ()
@@ -182,36 +188,37 @@ async def iterate(
182
188
self , query : ClauseElement
183
189
) -> typing .AsyncGenerator [typing .Any , None ]:
184
190
assert self ._connection is not None , "Connection is not acquired"
185
- query_str , args , context = self ._compile (query )
191
+ query_str , args , result_columns , context = self ._compile (query )
192
+ column_maps = create_column_maps (result_columns )
193
+ dialect = self ._dialect
186
194
cursor = await self ._connection .cursor ()
187
195
try :
188
196
await cursor .execute (query_str , args )
189
197
metadata = CursorResultMetaData (context , cursor .description )
190
198
async for row in cursor :
191
- yield Row (
199
+ record = Row (
192
200
metadata ,
193
201
metadata ._processors ,
194
202
metadata ._keymap ,
195
- Row ._default_key_style ,
196
203
row ,
197
204
)
205
+ yield Record (record , result_columns , dialect , column_maps )
198
206
finally :
199
207
cursor .close ()
200
208
201
209
def transaction (self ) -> TransactionBackend :
202
210
return AiopgTransaction (self )
203
211
204
- def _compile (
205
- self , query : ClauseElement
206
- ) -> typing .Tuple [str , dict , CompilationContext ]:
212
+ def _compile (self , query : ClauseElement ) -> typing .Tuple [str , list , tuple ]:
207
213
compiled = query .compile (
208
214
dialect = self ._dialect , compile_kwargs = {"render_postcompile" : True }
209
215
)
210
-
211
216
execution_context = self ._dialect .execution_ctx_cls ()
212
217
execution_context .dialect = self ._dialect
213
218
214
219
if not isinstance (query , DDLElement ):
220
+ compiled_params = sorted (compiled .params .items ())
221
+
215
222
args = compiled .construct_params ()
216
223
for key , val in args .items ():
217
224
if key in compiled ._bind_processors :
@@ -224,11 +231,23 @@ def _compile(
224
231
compiled ._ad_hoc_textual ,
225
232
compiled ._loose_column_name_matching ,
226
233
)
234
+
235
+ mapping = {
236
+ key : "$" + str (i ) for i , (key , _ ) in enumerate (compiled_params , start = 1 )
237
+ }
238
+ compiled_query = compiled .string % mapping
239
+ result_map = compiled ._result_columns
240
+
227
241
else :
228
242
args = {}
243
+ result_map = None
244
+ compiled_query = compiled .string
229
245
230
- logger .debug ("Query: %s\n Args: %s" , compiled .string , args )
231
- return compiled .string , args , CompilationContext (execution_context )
246
+ query_message = compiled_query .replace (" \n " , " " ).replace ("\n " , " " )
247
+ logger .debug (
248
+ "Query: %s Args: %s" , query_message , repr (tuple (args )), extra = LOG_EXTRA
249
+ )
250
+ return compiled .string , args , result_map , CompilationContext (execution_context )
232
251
233
252
@property
234
253
def raw_connection (self ) -> aiopg .connection .Connection :
0 commit comments