You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
4.3 KiB

6 months ago
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""
NOTE: All functions in this module are considered private and are
subject to abrupt breaking changes. Please do not use them directly.
"""
import io
import logging
from gzip import GzipFile
from gzip import compress as gzip_compress
from botocore.compat import urlencode
from botocore.utils import determine_content_length
logger = logging.getLogger(__name__)
def maybe_compress_request(config, request_dict, operation_model):
"""Attempt to compress the request body using the modeled encodings."""
if _should_compress_request(config, request_dict, operation_model):
for encoding in operation_model.request_compression['encodings']:
encoder = COMPRESSION_MAPPING.get(encoding)
if encoder is not None:
logger.debug('Compressing request with %s encoding.', encoding)
request_dict['body'] = encoder(request_dict['body'])
_set_compression_header(request_dict['headers'], encoding)
return
else:
logger.debug('Unsupported compression encoding: %s', encoding)
def _should_compress_request(config, request_dict, operation_model):
if (
config.disable_request_compression is not True
and config.signature_version != 'v2'
and operation_model.request_compression is not None
):
if not _is_compressible_type(request_dict):
body_type = type(request_dict['body'])
log_msg = 'Body type %s does not support compression.'
logger.debug(log_msg, body_type)
return False
if operation_model.has_streaming_input:
streaming_input = operation_model.get_streaming_input()
streaming_metadata = streaming_input.metadata
return 'requiresLength' not in streaming_metadata
body_size = _get_body_size(request_dict['body'])
min_size = config.request_min_compression_size_bytes
return min_size <= body_size
return False
def _is_compressible_type(request_dict):
body = request_dict['body']
# Coerce dict to a format compatible with compression.
if isinstance(body, dict):
body = urlencode(body, doseq=True, encoding='utf-8').encode('utf-8')
request_dict['body'] = body
is_supported_type = isinstance(body, (str, bytes, bytearray))
return is_supported_type or hasattr(body, 'read')
def _get_body_size(body):
size = determine_content_length(body)
if size is None:
logger.debug(
'Unable to get length of the request body: %s. '
'Skipping compression.',
body,
)
size = 0
return size
def _gzip_compress_body(body):
if isinstance(body, str):
return gzip_compress(body.encode('utf-8'))
elif isinstance(body, (bytes, bytearray)):
return gzip_compress(body)
elif hasattr(body, 'read'):
if hasattr(body, 'seek') and hasattr(body, 'tell'):
current_position = body.tell()
compressed_obj = _gzip_compress_fileobj(body)
body.seek(current_position)
return compressed_obj
return _gzip_compress_fileobj(body)
def _gzip_compress_fileobj(body):
compressed_obj = io.BytesIO()
with GzipFile(fileobj=compressed_obj, mode='wb') as gz:
while True:
chunk = body.read(8192)
if not chunk:
break
if isinstance(chunk, str):
chunk = chunk.encode('utf-8')
gz.write(chunk)
compressed_obj.seek(0)
return compressed_obj
def _set_compression_header(headers, encoding):
ce_header = headers.get('Content-Encoding')
if ce_header is None:
headers['Content-Encoding'] = encoding
else:
headers['Content-Encoding'] = f'{ce_header},{encoding}'
COMPRESSION_MAPPING = {'gzip': _gzip_compress_body}

Powered by BW's shoe-string budget.