Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AioAWSResponse and AioAWSRequest #934

Merged
merged 12 commits into from
May 5, 2022
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
Changes
-------
2.3.0 (2022-05-05)
^^^^^^^^^^^^^^^^^^
* fix encoding issue by swapping to AioAWSResponse and AioAWSRequest to behave more
like botocore
* fix exceptions mappings

2.2.0 (2022-03-16)
^^^^^^^^^^^^^^^^^^
* remove deprecated APIs
Expand Down
2 changes: 1 addition & 1 deletion aiobotocore/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.2.0'
__version__ = '2.3.0'
65 changes: 0 additions & 65 deletions aiobotocore/_endpoint_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import aiohttp.http_exceptions
from aiohttp.client_reqrep import ClientResponse
import asyncio
import botocore.retryhandler
import wrapt
Expand Down Expand Up @@ -33,67 +32,3 @@ class _IOBaseWrapper(wrapt.ObjectProxy):
def close(self):
# this stream should not be closed by aiohttp, like 1.x
pass


# This is similar to botocore.response.StreamingBody
class ClientResponseContentProxy(wrapt.ObjectProxy):
"""Proxy object for content stream of http response. This is here in case
you want to pass around the "Body" of the response without closing the
response itself."""

def __init__(self, response):
super().__init__(response.__wrapped__.content)
self._self_response = response

# Note: we don't have a __del__ method as the ClientResponse has a __del__
# which will warn the user if they didn't close/release the response
# explicitly. A release here would mean reading all the unread data
# (which could be very large), and a close would mean being unable to re-
# use the connection, so the user MUST chose. Default is to warn + close
async def __aenter__(self):
await self._self_response.__aenter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self._self_response.__aexit__(exc_type, exc_val, exc_tb)

@property
def url(self):
return self._self_response.url

def close(self):
self._self_response.close()


class ClientResponseProxy(wrapt.ObjectProxy):
"""Proxy object for http response useful for porting from
botocore underlying http library."""

def __init__(self, *args, **kwargs):
super().__init__(ClientResponse(*args, **kwargs))

# this matches ClientResponse._body
self._self_body = None

@property
def status_code(self):
return self.status

@status_code.setter
def status_code(self, value):
# botocore tries to set this, see:
# https://github.com/aio-libs/aiobotocore/issues/190
# Luckily status is an attribute we can set
self.status = value

@property
def content(self):
return self._self_body

@property
def raw(self):
return ClientResponseContentProxy(self)

async def read(self):
self._self_body = await self.__wrapped__.read()
return self._self_body
16 changes: 16 additions & 0 deletions aiobotocore/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import inspect


async def resolve_awaitable(obj):
if inspect.isawaitable(obj):
return await obj

return obj


async def async_any(items):
for item in items:
if await resolve_awaitable(item):
return True

return False
30 changes: 30 additions & 0 deletions aiobotocore/awsrequest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from botocore.awsrequest import AWSResponse
import botocore.utils


class AioAWSResponse(AWSResponse):
# Unlike AWSResponse, these return awaitables

async def _content_prop(self):
"""Content of the response as bytes."""

if self._content is None:
# NOTE: this will cache the data in self.raw
self._content = await self.raw.read() or bytes()

return self._content

@property
def content(self):
return self._content_prop()

async def _text_prop(self):
encoding = botocore.utils.get_encoding_from_headers(self.headers)
if encoding:
return (await self.content).decode(encoding)
else:
return (await self.content).decode('utf-8')

@property
def text(self):
return self._text_prop()
37 changes: 37 additions & 0 deletions aiobotocore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from .discovery import AioEndpointDiscoveryManager, AioEndpointDiscoveryHandler
from .retries import adaptive
from . import waiter
from .retries import standard


history_recorder = get_global_history_recorder()

Expand Down Expand Up @@ -124,11 +126,46 @@ def _register_retries(self, client):
elif retry_mode == 'legacy':
self._register_legacy_retries(client)

def _register_v2_standard_retries(self, client):
max_attempts = client.meta.config.retries.get('total_max_attempts')
kwargs = {'client': client}
if max_attempts is not None:
kwargs['max_attempts'] = max_attempts
standard.register_retry_handler(**kwargs)

def _register_v2_adaptive_retries(self, client):
# See comment in `_register_retries`.
# Note that this `adaptive` module is an aiobotocore reimplementation.
adaptive.register_retry_handler(client)

def _register_legacy_retries(self, client):
endpoint_prefix = client.meta.service_model.endpoint_prefix
service_id = client.meta.service_model.service_id
service_event_name = service_id.hyphenize()

# First, we load the entire retry config for all services,
# then pull out just the information we need.
original_config = self._loader.load_data('_retry')
if not original_config:
return

retries = self._transform_legacy_retries(client.meta.config.retries)
retry_config = self._retry_config_translator.build_retry_config(
endpoint_prefix, original_config.get('retry', {}),
original_config.get('definitions', {}),
retries
)

logger.debug("Registering retry handlers for service: %s",
client.meta.service_model.service_name)
handler = self._retry_handler_factory.create_retry_handler(
retry_config, endpoint_prefix)
unique_id = 'retry-config-%s' % service_event_name
client.meta.events.register(
'needs-retry.%s' % service_event_name, handler,
unique_id=unique_id
)

def _register_s3_events(self, client, endpoint_bridge, endpoint_url,
client_config, scoped_config):
if client.meta.service_model.service_name != 's3':
Expand Down
22 changes: 7 additions & 15 deletions aiobotocore/endpoint.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import aiohttp
import asyncio

import aiohttp.http_exceptions
from botocore.endpoint import EndpointCreator, Endpoint, DEFAULT_TIMEOUT, \
MAX_POOL_CONNECTIONS, logger, history_recorder, create_request_object, \
is_valid_ipv6_endpoint_url, is_valid_endpoint_url, handle_checksum_body
from botocore.exceptions import ConnectionClosedError
is_valid_ipv6_endpoint_url, is_valid_endpoint_url, HTTPClientError
from botocore.hooks import first_non_none_response
from urllib3.response import HTTPHeaderDict

from aiobotocore.httpsession import AIOHTTPSession
from aiobotocore.response import StreamingBody
from aiobotocore._endpoint_helpers import ClientResponseProxy # noqa: F401, E501 lgtm [py/unused-import]
from aiobotocore.httpchecksum import handle_checksum_body


async def convert_to_response_dict(http_response, operation_model):
Expand All @@ -37,21 +34,21 @@ async def convert_to_response_dict(http_response, operation_model):
# aiohttp's CIMultiDict camel cases the headers :(
'headers': HTTPHeaderDict(
{k.decode('utf-8').lower(): v.decode('utf-8')
for k, v in http_response.raw_headers}),
for k, v in http_response.raw.raw_headers}),
'status_code': http_response.status_code,
'context': {
'operation_name': operation_model.name,
}
}
if response_dict['status_code'] >= 300:
response_dict['body'] = await http_response.read()
response_dict['body'] = await http_response.content
elif operation_model.has_event_stream_output:
response_dict['body'] = http_response.raw
elif operation_model.has_streaming_output:
length = response_dict['headers'].get('content-length')
response_dict['body'] = StreamingBody(http_response.raw, length)
else:
response_dict['body'] = await http_response.read()
response_dict['body'] = await http_response.content
return response_dict


Expand Down Expand Up @@ -150,13 +147,8 @@ async def _do_get_response(self, request, operation_model, context):
http_response = first_non_none_response(responses)
if http_response is None:
http_response = await self._send(request)
except aiohttp.ClientConnectionError as e:
e.request = request # botocore expects the request property
except HTTPClientError as e:
return None, e
except aiohttp.http_exceptions.BadStatusLine:
better_exception = ConnectionClosedError(
endpoint_url=request.url, request=request)
return None, better_exception
except Exception as e:
logger.debug("Exception received when sending HTTP request.",
exc_info=True)
Expand All @@ -165,7 +157,7 @@ async def _do_get_response(self, request, operation_model, context):
# This returns the http_response and the parsed_data.
response_dict = await convert_to_response_dict(http_response,
operation_model)
handle_checksum_body(
await handle_checksum_body(
http_response, response_dict, context, operation_model,
)

Expand Down
66 changes: 65 additions & 1 deletion aiobotocore/handlers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,51 @@
from botocore.handlers import _get_presigned_url_source_and_destination_regions, \
_get_cross_region_presigned_url
_get_cross_region_presigned_url, ETree, logger, XMLParseError


async def check_for_200_error(response, **kwargs):
# From: http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html
# There are two opportunities for a copy request to return an error. One
# can occur when Amazon S3 receives the copy request and the other can
# occur while Amazon S3 is copying the files. If the error occurs before
# the copy operation starts, you receive a standard Amazon S3 error. If the
# error occurs during the copy operation, the error response is embedded in
# the 200 OK response. This means that a 200 OK response can contain either
# a success or an error. Make sure to design your application to parse the
# contents of the response and handle it appropriately.
#
# So this handler checks for this case. Even though the server sends a
# 200 response, conceptually this should be handled exactly like a
# 500 response (with respect to raising exceptions, retries, etc.)
# We're connected *before* all the other retry logic handlers, so as long
# as we switch the error code to 500, we'll retry the error as expected.
if response is None:
# A None response can happen if an exception is raised while
# trying to retrieve the response. See Endpoint._get_response().
return
http_response, parsed = response
if await _looks_like_special_case_error(http_response):
logger.debug("Error found for response with 200 status code, "
"errors: %s, changing status code to "
"500.", parsed)
http_response.status_code = 500


async def _looks_like_special_case_error(http_response):
if http_response.status_code == 200:
try:
parser = ETree.XMLParser(
target=ETree.TreeBuilder(),
encoding='utf-8')
parser.feed(await http_response.content)
root = parser.close()
except XMLParseError:
# In cases of network disruptions, we may end up with a partial
# streamed response from S3. We need to treat these cases as
# 500 Service Errors and try again.
return True
if root.tag == 'Error':
return True
return False


async def inject_presigned_url_ec2(params, request_signer, model, **kwargs):
Expand Down Expand Up @@ -36,3 +82,21 @@ async def inject_presigned_url_rds(params, request_signer, model, **kwargs):
url = await _get_cross_region_presigned_url(
request_signer, params, model, src, dest)
params['body']['PreSignedUrl'] = url


async def parse_get_bucket_location(parsed, http_response, **kwargs):
# s3.GetBucketLocation cannot be modeled properly. To
# account for this we just manually parse the XML document.
# The "parsed" passed in only has the ResponseMetadata
# filled out. This handler will fill in the LocationConstraint
# value.
if http_response.raw is None:
return
response_body = await http_response.content
parser = ETree.XMLParser(
target=ETree.TreeBuilder(),
encoding='utf-8')
parser.feed(response_body)
root = parser.close()
region = root.text
parsed['LocationConstraint'] = region
42 changes: 35 additions & 7 deletions aiobotocore/hooks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
import asyncio

from botocore.hooks import HierarchicalEmitter, logger
from botocore.handlers import \
inject_presigned_url_rds as boto_inject_presigned_url_rds, \
inject_presigned_url_ec2 as boto_inject_presigned_url_ec2, \
parse_get_bucket_location as boto_parse_get_bucket_location, \
check_for_200_error as boto_check_for_200_error
from botocore.signers import \
add_generate_presigned_url as boto_add_generate_presigned_url, \
add_generate_presigned_post as boto_add_generate_presigned_post, \
add_generate_db_auth_token as boto_add_generate_db_auth_token

from ._helpers import resolve_awaitable
from .signers import add_generate_presigned_url, add_generate_presigned_post, \
add_generate_db_auth_token
from .handlers import inject_presigned_url_ec2, inject_presigned_url_rds, \
parse_get_bucket_location, check_for_200_error


_HANDLER_MAPPING = {
boto_inject_presigned_url_ec2: inject_presigned_url_ec2,
boto_inject_presigned_url_rds: inject_presigned_url_rds,
boto_add_generate_presigned_url: add_generate_presigned_url,
boto_add_generate_presigned_post: add_generate_presigned_post,
boto_add_generate_db_auth_token: add_generate_db_auth_token,
boto_parse_get_bucket_location: parse_get_bucket_location,
boto_check_for_200_error: check_for_200_error
}


class AioHierarchicalEmitter(HierarchicalEmitter):
Expand All @@ -23,11 +47,7 @@ async def _emit(self, event_name, kwargs, stop_on_response=False):
logger.debug('Event %s: calling handler %s', event_name, handler)

# Await the handler if its a coroutine.
if asyncio.iscoroutinefunction(handler):
response = await handler(**kwargs)
else:
response = handler(**kwargs)

response = await resolve_awaitable(handler(**kwargs))
responses.append((handler, response))
if stop_on_response and response is not None:
return responses
Expand All @@ -39,3 +59,11 @@ async def emit_until_response(self, event_name, **kwargs):
return responses[-1]
else:
return None, None

def _verify_and_register(self, event_name, handler, unique_id,
register_method, unique_id_uses_count):
handler = _HANDLER_MAPPING.get(handler, handler)

self._verify_is_callable(handler)
self._verify_accept_kwargs(handler)
register_method(event_name, handler, unique_id, unique_id_uses_count)
Loading