Skip to content

Commit 0c6c571

Browse files
authored
AioAWSResponse and AioAWSRequest (#934)
1 parent 4694ba4 commit 0c6c571

22 files changed

+879
-264
lines changed

CHANGES.rst

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
Changes
22
-------
3+
2.3.0 (2022-05-05)
4+
^^^^^^^^^^^^^^^^^^
5+
* fix encoding issue by swapping to AioAWSResponse and AioAWSRequest to behave more
6+
like botocore
7+
* fix exceptions mappings
8+
39
2.2.0 (2022-03-16)
410
^^^^^^^^^^^^^^^^^^
511
* remove deprecated APIs

aiobotocore/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '2.2.0'
1+
__version__ = '2.3.0'

aiobotocore/_endpoint_helpers.py

-65
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import aiohttp.http_exceptions
2-
from aiohttp.client_reqrep import ClientResponse
32
import asyncio
43
import botocore.retryhandler
54
import wrapt
@@ -33,67 +32,3 @@ class _IOBaseWrapper(wrapt.ObjectProxy):
3332
def close(self):
3433
# this stream should not be closed by aiohttp, like 1.x
3534
pass
36-
37-
38-
# This is similar to botocore.response.StreamingBody
39-
class ClientResponseContentProxy(wrapt.ObjectProxy):
40-
"""Proxy object for content stream of http response. This is here in case
41-
you want to pass around the "Body" of the response without closing the
42-
response itself."""
43-
44-
def __init__(self, response):
45-
super().__init__(response.__wrapped__.content)
46-
self._self_response = response
47-
48-
# Note: we don't have a __del__ method as the ClientResponse has a __del__
49-
# which will warn the user if they didn't close/release the response
50-
# explicitly. A release here would mean reading all the unread data
51-
# (which could be very large), and a close would mean being unable to re-
52-
# use the connection, so the user MUST chose. Default is to warn + close
53-
async def __aenter__(self):
54-
await self._self_response.__aenter__()
55-
return self
56-
57-
async def __aexit__(self, exc_type, exc_val, exc_tb):
58-
return await self._self_response.__aexit__(exc_type, exc_val, exc_tb)
59-
60-
@property
61-
def url(self):
62-
return self._self_response.url
63-
64-
def close(self):
65-
self._self_response.close()
66-
67-
68-
class ClientResponseProxy(wrapt.ObjectProxy):
69-
"""Proxy object for http response useful for porting from
70-
botocore underlying http library."""
71-
72-
def __init__(self, *args, **kwargs):
73-
super().__init__(ClientResponse(*args, **kwargs))
74-
75-
# this matches ClientResponse._body
76-
self._self_body = None
77-
78-
@property
79-
def status_code(self):
80-
return self.status
81-
82-
@status_code.setter
83-
def status_code(self, value):
84-
# botocore tries to set this, see:
85-
# https://github.com/aio-libs/aiobotocore/issues/190
86-
# Luckily status is an attribute we can set
87-
self.status = value
88-
89-
@property
90-
def content(self):
91-
return self._self_body
92-
93-
@property
94-
def raw(self):
95-
return ClientResponseContentProxy(self)
96-
97-
async def read(self):
98-
self._self_body = await self.__wrapped__.read()
99-
return self._self_body

aiobotocore/_helpers.py

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import inspect
2+
3+
4+
async def resolve_awaitable(obj):
5+
if inspect.isawaitable(obj):
6+
return await obj
7+
8+
return obj
9+
10+
11+
async def async_any(items):
12+
for item in items:
13+
if await resolve_awaitable(item):
14+
return True
15+
16+
return False

aiobotocore/awsrequest.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from botocore.awsrequest import AWSResponse
2+
import botocore.utils
3+
4+
5+
class AioAWSResponse(AWSResponse):
6+
# Unlike AWSResponse, these return awaitables
7+
8+
async def _content_prop(self):
9+
"""Content of the response as bytes."""
10+
11+
if self._content is None:
12+
# NOTE: this will cache the data in self.raw
13+
self._content = await self.raw.read() or bytes()
14+
15+
return self._content
16+
17+
@property
18+
def content(self):
19+
return self._content_prop()
20+
21+
async def _text_prop(self):
22+
encoding = botocore.utils.get_encoding_from_headers(self.headers)
23+
if encoding:
24+
return (await self.content).decode(encoding)
25+
else:
26+
return (await self.content).decode('utf-8')
27+
28+
@property
29+
def text(self):
30+
return self._text_prop()

aiobotocore/client.py

+37
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from .discovery import AioEndpointDiscoveryManager, AioEndpointDiscoveryHandler
1616
from .retries import adaptive
1717
from . import waiter
18+
from .retries import standard
19+
1820

1921
history_recorder = get_global_history_recorder()
2022

@@ -124,11 +126,46 @@ def _register_retries(self, client):
124126
elif retry_mode == 'legacy':
125127
self._register_legacy_retries(client)
126128

129+
def _register_v2_standard_retries(self, client):
130+
max_attempts = client.meta.config.retries.get('total_max_attempts')
131+
kwargs = {'client': client}
132+
if max_attempts is not None:
133+
kwargs['max_attempts'] = max_attempts
134+
standard.register_retry_handler(**kwargs)
135+
127136
def _register_v2_adaptive_retries(self, client):
128137
# See comment in `_register_retries`.
129138
# Note that this `adaptive` module is an aiobotocore reimplementation.
130139
adaptive.register_retry_handler(client)
131140

141+
def _register_legacy_retries(self, client):
142+
endpoint_prefix = client.meta.service_model.endpoint_prefix
143+
service_id = client.meta.service_model.service_id
144+
service_event_name = service_id.hyphenize()
145+
146+
# First, we load the entire retry config for all services,
147+
# then pull out just the information we need.
148+
original_config = self._loader.load_data('_retry')
149+
if not original_config:
150+
return
151+
152+
retries = self._transform_legacy_retries(client.meta.config.retries)
153+
retry_config = self._retry_config_translator.build_retry_config(
154+
endpoint_prefix, original_config.get('retry', {}),
155+
original_config.get('definitions', {}),
156+
retries
157+
)
158+
159+
logger.debug("Registering retry handlers for service: %s",
160+
client.meta.service_model.service_name)
161+
handler = self._retry_handler_factory.create_retry_handler(
162+
retry_config, endpoint_prefix)
163+
unique_id = 'retry-config-%s' % service_event_name
164+
client.meta.events.register(
165+
'needs-retry.%s' % service_event_name, handler,
166+
unique_id=unique_id
167+
)
168+
132169
def _register_s3_events(self, client, endpoint_bridge, endpoint_url,
133170
client_config, scoped_config):
134171
if client.meta.service_model.service_name != 's3':

aiobotocore/endpoint.py

+7-15
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
1-
import aiohttp
21
import asyncio
32

4-
import aiohttp.http_exceptions
53
from botocore.endpoint import EndpointCreator, Endpoint, DEFAULT_TIMEOUT, \
64
MAX_POOL_CONNECTIONS, logger, history_recorder, create_request_object, \
7-
is_valid_ipv6_endpoint_url, is_valid_endpoint_url, handle_checksum_body
8-
from botocore.exceptions import ConnectionClosedError
5+
is_valid_ipv6_endpoint_url, is_valid_endpoint_url, HTTPClientError
96
from botocore.hooks import first_non_none_response
107
from urllib3.response import HTTPHeaderDict
118

129
from aiobotocore.httpsession import AIOHTTPSession
1310
from aiobotocore.response import StreamingBody
14-
from aiobotocore._endpoint_helpers import ClientResponseProxy # noqa: F401, E501 lgtm [py/unused-import]
11+
from aiobotocore.httpchecksum import handle_checksum_body
1512

1613

1714
async def convert_to_response_dict(http_response, operation_model):
@@ -37,21 +34,21 @@ async def convert_to_response_dict(http_response, operation_model):
3734
# aiohttp's CIMultiDict camel cases the headers :(
3835
'headers': HTTPHeaderDict(
3936
{k.decode('utf-8').lower(): v.decode('utf-8')
40-
for k, v in http_response.raw_headers}),
37+
for k, v in http_response.raw.raw_headers}),
4138
'status_code': http_response.status_code,
4239
'context': {
4340
'operation_name': operation_model.name,
4441
}
4542
}
4643
if response_dict['status_code'] >= 300:
47-
response_dict['body'] = await http_response.read()
44+
response_dict['body'] = await http_response.content
4845
elif operation_model.has_event_stream_output:
4946
response_dict['body'] = http_response.raw
5047
elif operation_model.has_streaming_output:
5148
length = response_dict['headers'].get('content-length')
5249
response_dict['body'] = StreamingBody(http_response.raw, length)
5350
else:
54-
response_dict['body'] = await http_response.read()
51+
response_dict['body'] = await http_response.content
5552
return response_dict
5653

5754

@@ -150,13 +147,8 @@ async def _do_get_response(self, request, operation_model, context):
150147
http_response = first_non_none_response(responses)
151148
if http_response is None:
152149
http_response = await self._send(request)
153-
except aiohttp.ClientConnectionError as e:
154-
e.request = request # botocore expects the request property
150+
except HTTPClientError as e:
155151
return None, e
156-
except aiohttp.http_exceptions.BadStatusLine:
157-
better_exception = ConnectionClosedError(
158-
endpoint_url=request.url, request=request)
159-
return None, better_exception
160152
except Exception as e:
161153
logger.debug("Exception received when sending HTTP request.",
162154
exc_info=True)
@@ -165,7 +157,7 @@ async def _do_get_response(self, request, operation_model, context):
165157
# This returns the http_response and the parsed_data.
166158
response_dict = await convert_to_response_dict(http_response,
167159
operation_model)
168-
handle_checksum_body(
160+
await handle_checksum_body(
169161
http_response, response_dict, context, operation_model,
170162
)
171163

aiobotocore/handlers.py

+65-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,51 @@
11
from botocore.handlers import _get_presigned_url_source_and_destination_regions, \
2-
_get_cross_region_presigned_url
2+
_get_cross_region_presigned_url, ETree, logger, XMLParseError
3+
4+
5+
async def check_for_200_error(response, **kwargs):
6+
# From: http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html
7+
# There are two opportunities for a copy request to return an error. One
8+
# can occur when Amazon S3 receives the copy request and the other can
9+
# occur while Amazon S3 is copying the files. If the error occurs before
10+
# the copy operation starts, you receive a standard Amazon S3 error. If the
11+
# error occurs during the copy operation, the error response is embedded in
12+
# the 200 OK response. This means that a 200 OK response can contain either
13+
# a success or an error. Make sure to design your application to parse the
14+
# contents of the response and handle it appropriately.
15+
#
16+
# So this handler checks for this case. Even though the server sends a
17+
# 200 response, conceptually this should be handled exactly like a
18+
# 500 response (with respect to raising exceptions, retries, etc.)
19+
# We're connected *before* all the other retry logic handlers, so as long
20+
# as we switch the error code to 500, we'll retry the error as expected.
21+
if response is None:
22+
# A None response can happen if an exception is raised while
23+
# trying to retrieve the response. See Endpoint._get_response().
24+
return
25+
http_response, parsed = response
26+
if await _looks_like_special_case_error(http_response):
27+
logger.debug("Error found for response with 200 status code, "
28+
"errors: %s, changing status code to "
29+
"500.", parsed)
30+
http_response.status_code = 500
31+
32+
33+
async def _looks_like_special_case_error(http_response):
34+
if http_response.status_code == 200:
35+
try:
36+
parser = ETree.XMLParser(
37+
target=ETree.TreeBuilder(),
38+
encoding='utf-8')
39+
parser.feed(await http_response.content)
40+
root = parser.close()
41+
except XMLParseError:
42+
# In cases of network disruptions, we may end up with a partial
43+
# streamed response from S3. We need to treat these cases as
44+
# 500 Service Errors and try again.
45+
return True
46+
if root.tag == 'Error':
47+
return True
48+
return False
349

450

551
async def inject_presigned_url_ec2(params, request_signer, model, **kwargs):
@@ -36,3 +82,21 @@ async def inject_presigned_url_rds(params, request_signer, model, **kwargs):
3682
url = await _get_cross_region_presigned_url(
3783
request_signer, params, model, src, dest)
3884
params['body']['PreSignedUrl'] = url
85+
86+
87+
async def parse_get_bucket_location(parsed, http_response, **kwargs):
88+
# s3.GetBucketLocation cannot be modeled properly. To
89+
# account for this we just manually parse the XML document.
90+
# The "parsed" passed in only has the ResponseMetadata
91+
# filled out. This handler will fill in the LocationConstraint
92+
# value.
93+
if http_response.raw is None:
94+
return
95+
response_body = await http_response.content
96+
parser = ETree.XMLParser(
97+
target=ETree.TreeBuilder(),
98+
encoding='utf-8')
99+
parser.feed(response_body)
100+
root = parser.close()
101+
region = root.text
102+
parsed['LocationConstraint'] = region

aiobotocore/hooks.py

+35-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,30 @@
1-
import asyncio
2-
31
from botocore.hooks import HierarchicalEmitter, logger
2+
from botocore.handlers import \
3+
inject_presigned_url_rds as boto_inject_presigned_url_rds, \
4+
inject_presigned_url_ec2 as boto_inject_presigned_url_ec2, \
5+
parse_get_bucket_location as boto_parse_get_bucket_location, \
6+
check_for_200_error as boto_check_for_200_error
7+
from botocore.signers import \
8+
add_generate_presigned_url as boto_add_generate_presigned_url, \
9+
add_generate_presigned_post as boto_add_generate_presigned_post, \
10+
add_generate_db_auth_token as boto_add_generate_db_auth_token
11+
12+
from ._helpers import resolve_awaitable
13+
from .signers import add_generate_presigned_url, add_generate_presigned_post, \
14+
add_generate_db_auth_token
15+
from .handlers import inject_presigned_url_ec2, inject_presigned_url_rds, \
16+
parse_get_bucket_location, check_for_200_error
17+
18+
19+
_HANDLER_MAPPING = {
20+
boto_inject_presigned_url_ec2: inject_presigned_url_ec2,
21+
boto_inject_presigned_url_rds: inject_presigned_url_rds,
22+
boto_add_generate_presigned_url: add_generate_presigned_url,
23+
boto_add_generate_presigned_post: add_generate_presigned_post,
24+
boto_add_generate_db_auth_token: add_generate_db_auth_token,
25+
boto_parse_get_bucket_location: parse_get_bucket_location,
26+
boto_check_for_200_error: check_for_200_error
27+
}
428

529

630
class AioHierarchicalEmitter(HierarchicalEmitter):
@@ -23,11 +47,7 @@ async def _emit(self, event_name, kwargs, stop_on_response=False):
2347
logger.debug('Event %s: calling handler %s', event_name, handler)
2448

2549
# Await the handler if its a coroutine.
26-
if asyncio.iscoroutinefunction(handler):
27-
response = await handler(**kwargs)
28-
else:
29-
response = handler(**kwargs)
30-
50+
response = await resolve_awaitable(handler(**kwargs))
3151
responses.append((handler, response))
3252
if stop_on_response and response is not None:
3353
return responses
@@ -39,3 +59,11 @@ async def emit_until_response(self, event_name, **kwargs):
3959
return responses[-1]
4060
else:
4161
return None, None
62+
63+
def _verify_and_register(self, event_name, handler, unique_id,
64+
register_method, unique_id_uses_count):
65+
handler = _HANDLER_MAPPING.get(handler, handler)
66+
67+
self._verify_is_callable(handler)
68+
self._verify_accept_kwargs(handler)
69+
register_method(event_name, handler, unique_id, unique_id_uses_count)

0 commit comments

Comments
 (0)