1
+ """Parses a SQL statement and binds its parameters"""
2
+
1
3
import collections
2
4
import datetime
3
5
import enum
8
10
9
11
10
12
class Statement :
13
+ """Parses and binds a SQL statement"""
11
14
def __init__ (self , dialect , sql , * args , ** kwargs ):
12
15
if len (args ) > 0 and len (kwargs ) > 0 :
13
16
raise RuntimeError ("cannot pass both positional and named parameters" )
@@ -21,13 +24,14 @@ def __init__(self, dialect, sql, *args, **kwargs):
21
24
self ._command = self ._parse_command ()
22
25
self ._tokens = self ._bind_params ()
23
26
27
+
24
28
def _parse (self ):
25
29
formatted_statements = sqlparse .format (self ._sql , strip_comments = True ).strip ()
26
30
parsed_statements = sqlparse .parse (formatted_statements )
27
31
statement_count = len (parsed_statements )
28
32
if statement_count == 0 :
29
33
raise RuntimeError ("missing statement" )
30
- elif statement_count > 1 :
34
+ if statement_count > 1 :
31
35
raise RuntimeError ("too many statements at once" )
32
36
33
37
return parsed_statements [0 ]
@@ -49,11 +53,11 @@ def _parse_command(self):
49
53
def _bind_params (self ):
50
54
tokens = self ._tokenize ()
51
55
paramstyle , placeholders = self ._parse_placeholders (tokens )
52
- if paramstyle in {Paramstyle .FORMAT , Paramstyle .QMARK }:
56
+ if paramstyle in {_Paramstyle .FORMAT , _Paramstyle .QMARK }:
53
57
tokens = self ._bind_format_or_qmark (placeholders , tokens )
54
- elif paramstyle == Paramstyle .NUMERIC :
58
+ elif paramstyle == _Paramstyle .NUMERIC :
55
59
tokens = self ._bind_numeric (placeholders , tokens )
56
- if paramstyle in {Paramstyle .NAMED , Paramstyle .PYFORMAT }:
60
+ if paramstyle in {_Paramstyle .NAMED , _Paramstyle .PYFORMAT }:
57
61
tokens = self ._bind_named_or_pyformat (placeholders , tokens )
58
62
59
63
tokens = _escape_verbatim_colons (tokens )
@@ -86,9 +90,9 @@ def _parse_placeholders(self, tokens):
86
90
def _default_paramstyle (self ):
87
91
paramstyle = None
88
92
if self ._args :
89
- paramstyle = Paramstyle .QMARK
93
+ paramstyle = _Paramstyle .QMARK
90
94
elif self ._kwargs :
91
- paramstyle = Paramstyle .NAMED
95
+ paramstyle = _Paramstyle .NAMED
92
96
93
97
return paramstyle
94
98
@@ -118,8 +122,10 @@ def _bind_numeric(self, placeholders, tokens):
118
122
unused_arg_indices .remove (num )
119
123
120
124
if len (unused_arg_indices ) > 0 :
121
- unused_args = ", " .join ([str (self ._escape (self ._args [i ])) for i in sorted (unused_arg_indices )])
122
- raise RuntimeError (f"unused value{ '' if len (unused_arg_indices ) == 1 else 's' } ({ unused_args } )" )
125
+ unused_args = ", " .join (
126
+ [str (self ._escape (self ._args [i ])) for i in sorted (unused_arg_indices )])
127
+ raise RuntimeError (
128
+ f"unused value{ '' if len (unused_arg_indices ) == 1 else 's' } ({ unused_args } )" )
123
129
124
130
return tokens
125
131
@@ -134,7 +140,9 @@ def _bind_named_or_pyformat(self, placeholders, tokens):
134
140
unused_params .remove (param_name )
135
141
136
142
if len (unused_params ) > 0 :
137
- raise RuntimeError ("unused value{'' if len(unused_params) == 1 else 's'} ({', '.join(sorted(unused_params))})" )
143
+ joined_unused_params = ", " .join (sorted (unused_params ))
144
+ raise RuntimeError (
145
+ f"unused value{ '' if len (unused_params ) == 1 else 's' } ({ joined_unused_params } )" )
138
146
139
147
return tokens
140
148
@@ -144,7 +152,7 @@ def _escape(self, value):
144
152
Escapes value using engine's conversion function.
145
153
https://docs.sqlalchemy.org/en/latest/core/type_api.html#sqlalchemy.types.TypeEngine.literal_processor
146
154
"""
147
-
155
+ # pylint: disable=too-many-return-statements
148
156
if isinstance (value , (list , tuple )):
149
157
return self ._escape_iterable (value )
150
158
@@ -163,20 +171,18 @@ def _escape(self, value):
163
171
164
172
raise RuntimeError (f"unsupported value: { value } " )
165
173
174
+ string_processor = sqlalchemy .types .String ().literal_processor (self ._dialect )
166
175
if isinstance (value , datetime .date ):
167
176
return sqlparse .sql .Token (
168
- sqlparse .tokens .String ,
169
- sqlalchemy .types .String ().literal_processor (self ._dialect )(value .strftime ("%Y-%m-%d" )))
177
+ sqlparse .tokens .String , string_processor (value .strftime ("%Y-%m-%d" )))
170
178
171
179
if isinstance (value , datetime .datetime ):
172
180
return sqlparse .sql .Token (
173
- sqlparse .tokens .String ,
174
- sqlalchemy .types .String ().literal_processor (self ._dialect )(value .strftime ("%Y-%m-%d %H:%M:%S" )))
181
+ sqlparse .tokens .String , string_processor (value .strftime ("%Y-%m-%d %H:%M:%S" )))
175
182
176
183
if isinstance (value , datetime .time ):
177
184
return sqlparse .sql .Token (
178
- sqlparse .tokens .String ,
179
- sqlalchemy .types .String ().literal_processor (self ._dialect )(value .strftime ("%H:%M:%S" )))
185
+ sqlparse .tokens .String , string_processor (value .strftime ("%H:%M:%S" )))
180
186
181
187
if isinstance (value , float ):
182
188
return sqlparse .sql .Token (
@@ -189,9 +195,7 @@ def _escape(self, value):
189
195
sqlalchemy .types .Integer ().literal_processor (self ._dialect )(value ))
190
196
191
197
if isinstance (value , str ):
192
- return sqlparse .sql .Token (
193
- sqlparse .tokens .String ,
194
- sqlalchemy .types .String ().literal_processor (self ._dialect )(value ))
198
+ return sqlparse .sql .Token (sqlparse .tokens .String , string_processor (value ))
195
199
196
200
if value is None :
197
201
return sqlparse .sql .Token (
@@ -207,6 +211,7 @@ def _escape_iterable(self, iterable):
207
211
208
212
209
213
def get_command (self ):
214
+ """Returns statement command (e.g., SELECT) or None"""
210
215
return self ._command
211
216
212
217
@@ -220,41 +225,42 @@ def _is_placeholder(token):
220
225
221
226
def _parse_placeholder (token ):
222
227
if token .value == "?" :
223
- return Paramstyle .QMARK , None
228
+ return _Paramstyle .QMARK , None
224
229
225
230
# E.g., :1
226
231
matches = re .search (r"^:([1-9]\d*)$" , token .value )
227
232
if matches :
228
- return Paramstyle .NUMERIC , int (matches .group (1 )) - 1
233
+ return _Paramstyle .NUMERIC , int (matches .group (1 )) - 1
229
234
230
235
# E.g., :foo
231
236
matches = re .search (r"^:([a-zA-Z]\w*)$" , token .value )
232
237
if matches :
233
- return Paramstyle .NAMED , matches .group (1 )
238
+ return _Paramstyle .NAMED , matches .group (1 )
234
239
235
240
if token .value == "%s" :
236
- return Paramstyle .FORMAT , None
241
+ return _Paramstyle .FORMAT , None
237
242
238
243
# E.g., %(foo)s
239
244
matches = re .search (r"%\((\w+)\)s$" , token .value )
240
245
if matches :
241
- return Paramstyle .PYFORMAT , matches .group (1 )
246
+ return _Paramstyle .PYFORMAT , matches .group (1 )
242
247
243
248
raise RuntimeError (f"{ token .value } : invalid placeholder" )
244
249
245
250
246
251
def _escape_verbatim_colons (tokens ):
247
252
for token in tokens :
248
253
if _is_string_literal (token ):
249
- token .value = re .sub ("(^'|\s+):" , r"\1\:" , token .value )
254
+ token .value = re .sub (r "(^'|\s+):" , r"\1\:" , token .value )
250
255
elif _is_identifier (token ):
251
- token .value = re .sub ("(^\" |\s+):" , r"\1\:" , token .value )
256
+ token .value = re .sub (r "(^\"|\s+):" , r"\1\:" , token .value )
252
257
253
258
return tokens
254
259
255
260
256
261
def _is_command_token (token ):
257
- return token .ttype in {sqlparse .tokens .Keyword , sqlparse .tokens .Keyword .DDL , sqlparse .tokens .Keyword .DML }
262
+ return token .ttype in {
263
+ sqlparse .tokens .Keyword , sqlparse .tokens .Keyword .DDL , sqlparse .tokens .Keyword .DML }
258
264
259
265
260
266
def _is_string_literal (token ):
@@ -265,7 +271,7 @@ def _is_identifier(token):
265
271
return token .ttype == sqlparse .tokens .Literal .String .Symbol
266
272
267
273
268
- class Paramstyle (enum .Enum ):
274
+ class _Paramstyle (enum .Enum ):
269
275
FORMAT = enum .auto ()
270
276
NAMED = enum .auto ()
271
277
NUMERIC = enum .auto ()
0 commit comments