Skip to content

Commit

Permalink
AioAWSResponse and AioAWSRequest (#934)
Browse files Browse the repository at this point in the history
  • Loading branch information
thehesiod authored May 5, 2022
1 parent 4694ba4 commit 0c6c571
Show file tree
Hide file tree
Showing 22 changed files with 879 additions and 264 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
Changes
-------
2.3.0 (2022-05-05)
^^^^^^^^^^^^^^^^^^
* fix encoding issue by swapping to AioAWSResponse and AioAWSRequest to behave more
like botocore
* fix exceptions mappings

2.2.0 (2022-03-16)
^^^^^^^^^^^^^^^^^^
* remove deprecated APIs
Expand Down
2 changes: 1 addition & 1 deletion aiobotocore/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.2.0'
__version__ = '2.3.0'
65 changes: 0 additions & 65 deletions aiobotocore/_endpoint_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import aiohttp.http_exceptions
from aiohttp.client_reqrep import ClientResponse
import asyncio
import botocore.retryhandler
import wrapt
Expand Down Expand Up @@ -33,67 +32,3 @@ class _IOBaseWrapper(wrapt.ObjectProxy):
def close(self):
# this stream should not be closed by aiohttp, like 1.x
pass


# This is similar to botocore.response.StreamingBody
class ClientResponseContentProxy(wrapt.ObjectProxy):
"""Proxy object for content stream of http response. This is here in case
you want to pass around the "Body" of the response without closing the
response itself."""

def __init__(self, response):
super().__init__(response.__wrapped__.content)
self._self_response = response

# Note: we don't have a __del__ method as the ClientResponse has a __del__
# which will warn the user if they didn't close/release the response
# explicitly. A release here would mean reading all the unread data
# (which could be very large), and a close would mean being unable to re-
# use the connection, so the user MUST chose. Default is to warn + close
async def __aenter__(self):
await self._self_response.__aenter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self._self_response.__aexit__(exc_type, exc_val, exc_tb)

@property
def url(self):
return self._self_response.url

def close(self):
self._self_response.close()


class ClientResponseProxy(wrapt.ObjectProxy):
"""Proxy object for http response useful for porting from
botocore underlying http library."""

def __init__(self, *args, **kwargs):
super().__init__(ClientResponse(*args, **kwargs))

# this matches ClientResponse._body
self._self_body = None

@property
def status_code(self):
return self.status

@status_code.setter
def status_code(self, value):
# botocore tries to set this, see:
# https://github.com/aio-libs/aiobotocore/issues/190
# Luckily status is an attribute we can set
self.status = value

@property
def content(self):
return self._self_body

@property
def raw(self):
return ClientResponseContentProxy(self)

async def read(self):
self._self_body = await self.__wrapped__.read()
return self._self_body
16 changes: 16 additions & 0 deletions aiobotocore/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import inspect


async def resolve_awaitable(obj):
if inspect.isawaitable(obj):
return await obj

return obj


async def async_any(items):
for item in items:
if await resolve_awaitable(item):
return True

return False
30 changes: 30 additions & 0 deletions aiobotocore/awsrequest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from botocore.awsrequest import AWSResponse
import botocore.utils


class AioAWSResponse(AWSResponse):
# Unlike AWSResponse, these return awaitables

async def _content_prop(self):
"""Content of the response as bytes."""

if self._content is None:
# NOTE: this will cache the data in self.raw
self._content = await self.raw.read() or bytes()

return self._content

@property
def content(self):
return self._content_prop()

async def _text_prop(self):
encoding = botocore.utils.get_encoding_from_headers(self.headers)
if encoding:
return (await self.content).decode(encoding)
else:
return (await self.content).decode('utf-8')

@property
def text(self):
return self._text_prop()
37 changes: 37 additions & 0 deletions aiobotocore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from .discovery import AioEndpointDiscoveryManager, AioEndpointDiscoveryHandler
from .retries import adaptive
from . import waiter
from .retries import standard


history_recorder = get_global_history_recorder()

Expand Down Expand Up @@ -124,11 +126,46 @@ def _register_retries(self, client):
elif retry_mode == 'legacy':
self._register_legacy_retries(client)

def _register_v2_standard_retries(self, client):
max_attempts = client.meta.config.retries.get('total_max_attempts')
kwargs = {'client': client}
if max_attempts is not None:
kwargs['max_attempts'] = max_attempts
standard.register_retry_handler(**kwargs)

def _register_v2_adaptive_retries(self, client):
# See comment in `_register_retries`.
# Note that this `adaptive` module is an aiobotocore reimplementation.
adaptive.register_retry_handler(client)

def _register_legacy_retries(self, client):
endpoint_prefix = client.meta.service_model.endpoint_prefix
service_id = client.meta.service_model.service_id
service_event_name = service_id.hyphenize()

# First, we load the entire retry config for all services,
# then pull out just the information we need.
original_config = self._loader.load_data('_retry')
if not original_config:
return

retries = self._transform_legacy_retries(client.meta.config.retries)
retry_config = self._retry_config_translator.build_retry_config(
endpoint_prefix, original_config.get('retry', {}),
original_config.get('definitions', {}),
retries
)

logger.debug("Registering retry handlers for service: %s",
client.meta.service_model.service_name)
handler = self._retry_handler_factory.create_retry_handler(
retry_config, endpoint_prefix)
unique_id = 'retry-config-%s' % service_event_name
client.meta.events.register(
'needs-retry.%s' % service_event_name, handler,
unique_id=unique_id
)

def _register_s3_events(self, client, endpoint_bridge, endpoint_url,
client_config, scoped_config):
if client.meta.service_model.service_name != 's3':
Expand Down
22 changes: 7 additions & 15 deletions aiobotocore/endpoint.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import aiohttp
import asyncio

import aiohttp.http_exceptions
from botocore.endpoint import EndpointCreator, Endpoint, DEFAULT_TIMEOUT, \
MAX_POOL_CONNECTIONS, logger, history_recorder, create_request_object, \
is_valid_ipv6_endpoint_url, is_valid_endpoint_url, handle_checksum_body
from botocore.exceptions import ConnectionClosedError
is_valid_ipv6_endpoint_url, is_valid_endpoint_url, HTTPClientError
from botocore.hooks import first_non_none_response
from urllib3.response import HTTPHeaderDict

from aiobotocore.httpsession import AIOHTTPSession
from aiobotocore.response import StreamingBody
from aiobotocore._endpoint_helpers import ClientResponseProxy # noqa: F401, E501 lgtm [py/unused-import]
from aiobotocore.httpchecksum import handle_checksum_body


async def convert_to_response_dict(http_response, operation_model):
Expand All @@ -37,21 +34,21 @@ async def convert_to_response_dict(http_response, operation_model):
# aiohttp's CIMultiDict camel cases the headers :(
'headers': HTTPHeaderDict(
{k.decode('utf-8').lower(): v.decode('utf-8')
for k, v in http_response.raw_headers}),
for k, v in http_response.raw.raw_headers}),
'status_code': http_response.status_code,
'context': {
'operation_name': operation_model.name,
}
}
if response_dict['status_code'] >= 300:
response_dict['body'] = await http_response.read()
response_dict['body'] = await http_response.content
elif operation_model.has_event_stream_output:
response_dict['body'] = http_response.raw
elif operation_model.has_streaming_output:
length = response_dict['headers'].get('content-length')
response_dict['body'] = StreamingBody(http_response.raw, length)
else:
response_dict['body'] = await http_response.read()
response_dict['body'] = await http_response.content
return response_dict


Expand Down Expand Up @@ -150,13 +147,8 @@ async def _do_get_response(self, request, operation_model, context):
http_response = first_non_none_response(responses)
if http_response is None:
http_response = await self._send(request)
except aiohttp.ClientConnectionError as e:
e.request = request # botocore expects the request property
except HTTPClientError as e:
return None, e
except aiohttp.http_exceptions.BadStatusLine:
better_exception = ConnectionClosedError(
endpoint_url=request.url, request=request)
return None, better_exception
except Exception as e:
logger.debug("Exception received when sending HTTP request.",
exc_info=True)
Expand All @@ -165,7 +157,7 @@ async def _do_get_response(self, request, operation_model, context):
# This returns the http_response and the parsed_data.
response_dict = await convert_to_response_dict(http_response,
operation_model)
handle_checksum_body(
await handle_checksum_body(
http_response, response_dict, context, operation_model,
)

Expand Down
66 changes: 65 additions & 1 deletion aiobotocore/handlers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,51 @@
from botocore.handlers import _get_presigned_url_source_and_destination_regions, \
_get_cross_region_presigned_url
_get_cross_region_presigned_url, ETree, logger, XMLParseError


async def check_for_200_error(response, **kwargs):
# From: http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html
# There are two opportunities for a copy request to return an error. One
# can occur when Amazon S3 receives the copy request and the other can
# occur while Amazon S3 is copying the files. If the error occurs before
# the copy operation starts, you receive a standard Amazon S3 error. If the
# error occurs during the copy operation, the error response is embedded in
# the 200 OK response. This means that a 200 OK response can contain either
# a success or an error. Make sure to design your application to parse the
# contents of the response and handle it appropriately.
#
# So this handler checks for this case. Even though the server sends a
# 200 response, conceptually this should be handled exactly like a
# 500 response (with respect to raising exceptions, retries, etc.)
# We're connected *before* all the other retry logic handlers, so as long
# as we switch the error code to 500, we'll retry the error as expected.
if response is None:
# A None response can happen if an exception is raised while
# trying to retrieve the response. See Endpoint._get_response().
return
http_response, parsed = response
if await _looks_like_special_case_error(http_response):
logger.debug("Error found for response with 200 status code, "
"errors: %s, changing status code to "
"500.", parsed)
http_response.status_code = 500


async def _looks_like_special_case_error(http_response):
if http_response.status_code == 200:
try:
parser = ETree.XMLParser(
target=ETree.TreeBuilder(),
encoding='utf-8')
parser.feed(await http_response.content)
root = parser.close()
except XMLParseError:
# In cases of network disruptions, we may end up with a partial
# streamed response from S3. We need to treat these cases as
# 500 Service Errors and try again.
return True
if root.tag == 'Error':
return True
return False


async def inject_presigned_url_ec2(params, request_signer, model, **kwargs):
Expand Down Expand Up @@ -36,3 +82,21 @@ async def inject_presigned_url_rds(params, request_signer, model, **kwargs):
url = await _get_cross_region_presigned_url(
request_signer, params, model, src, dest)
params['body']['PreSignedUrl'] = url


async def parse_get_bucket_location(parsed, http_response, **kwargs):
# s3.GetBucketLocation cannot be modeled properly. To
# account for this we just manually parse the XML document.
# The "parsed" passed in only has the ResponseMetadata
# filled out. This handler will fill in the LocationConstraint
# value.
if http_response.raw is None:
return
response_body = await http_response.content
parser = ETree.XMLParser(
target=ETree.TreeBuilder(),
encoding='utf-8')
parser.feed(response_body)
root = parser.close()
region = root.text
parsed['LocationConstraint'] = region
42 changes: 35 additions & 7 deletions aiobotocore/hooks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
import asyncio

from botocore.hooks import HierarchicalEmitter, logger
from botocore.handlers import \
inject_presigned_url_rds as boto_inject_presigned_url_rds, \
inject_presigned_url_ec2 as boto_inject_presigned_url_ec2, \
parse_get_bucket_location as boto_parse_get_bucket_location, \
check_for_200_error as boto_check_for_200_error
from botocore.signers import \
add_generate_presigned_url as boto_add_generate_presigned_url, \
add_generate_presigned_post as boto_add_generate_presigned_post, \
add_generate_db_auth_token as boto_add_generate_db_auth_token

from ._helpers import resolve_awaitable
from .signers import add_generate_presigned_url, add_generate_presigned_post, \
add_generate_db_auth_token
from .handlers import inject_presigned_url_ec2, inject_presigned_url_rds, \
parse_get_bucket_location, check_for_200_error


_HANDLER_MAPPING = {
boto_inject_presigned_url_ec2: inject_presigned_url_ec2,
boto_inject_presigned_url_rds: inject_presigned_url_rds,
boto_add_generate_presigned_url: add_generate_presigned_url,
boto_add_generate_presigned_post: add_generate_presigned_post,
boto_add_generate_db_auth_token: add_generate_db_auth_token,
boto_parse_get_bucket_location: parse_get_bucket_location,
boto_check_for_200_error: check_for_200_error
}


class AioHierarchicalEmitter(HierarchicalEmitter):
Expand All @@ -23,11 +47,7 @@ async def _emit(self, event_name, kwargs, stop_on_response=False):
logger.debug('Event %s: calling handler %s', event_name, handler)

# Await the handler if its a coroutine.
if asyncio.iscoroutinefunction(handler):
response = await handler(**kwargs)
else:
response = handler(**kwargs)

response = await resolve_awaitable(handler(**kwargs))
responses.append((handler, response))
if stop_on_response and response is not None:
return responses
Expand All @@ -39,3 +59,11 @@ async def emit_until_response(self, event_name, **kwargs):
return responses[-1]
else:
return None, None

def _verify_and_register(self, event_name, handler, unique_id,
register_method, unique_id_uses_count):
handler = _HANDLER_MAPPING.get(handler, handler)

self._verify_is_callable(handler)
self._verify_accept_kwargs(handler)
register_method(event_name, handler, unique_id, unique_id_uses_count)
Loading

0 comments on commit 0c6c571

Please sign in to comment.