# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """ NOTE: All functions in this module are considered private and are subject to abrupt breaking changes. Please do not use them directly. """ import io import logging from gzip import GzipFile from gzip import compress as gzip_compress from botocore.compat import urlencode from botocore.utils import determine_content_length logger = logging.getLogger(__name__) def maybe_compress_request(config, request_dict, operation_model): """Attempt to compress the request body using the modeled encodings.""" if _should_compress_request(config, request_dict, operation_model): for encoding in operation_model.request_compression['encodings']: encoder = COMPRESSION_MAPPING.get(encoding) if encoder is not None: logger.debug('Compressing request with %s encoding.', encoding) request_dict['body'] = encoder(request_dict['body']) _set_compression_header(request_dict['headers'], encoding) return else: logger.debug('Unsupported compression encoding: %s', encoding) def _should_compress_request(config, request_dict, operation_model): if ( config.disable_request_compression is not True and config.signature_version != 'v2' and operation_model.request_compression is not None ): if not _is_compressible_type(request_dict): body_type = type(request_dict['body']) log_msg = 'Body type %s does not support compression.' logger.debug(log_msg, body_type) return False if operation_model.has_streaming_input: streaming_input = operation_model.get_streaming_input() streaming_metadata = streaming_input.metadata return 'requiresLength' not in streaming_metadata body_size = _get_body_size(request_dict['body']) min_size = config.request_min_compression_size_bytes return min_size <= body_size return False def _is_compressible_type(request_dict): body = request_dict['body'] # Coerce dict to a format compatible with compression. if isinstance(body, dict): body = urlencode(body, doseq=True, encoding='utf-8').encode('utf-8') request_dict['body'] = body is_supported_type = isinstance(body, (str, bytes, bytearray)) return is_supported_type or hasattr(body, 'read') def _get_body_size(body): size = determine_content_length(body) if size is None: logger.debug( 'Unable to get length of the request body: %s. ' 'Skipping compression.', body, ) size = 0 return size def _gzip_compress_body(body): if isinstance(body, str): return gzip_compress(body.encode('utf-8')) elif isinstance(body, (bytes, bytearray)): return gzip_compress(body) elif hasattr(body, 'read'): if hasattr(body, 'seek') and hasattr(body, 'tell'): current_position = body.tell() compressed_obj = _gzip_compress_fileobj(body) body.seek(current_position) return compressed_obj return _gzip_compress_fileobj(body) def _gzip_compress_fileobj(body): compressed_obj = io.BytesIO() with GzipFile(fileobj=compressed_obj, mode='wb') as gz: while True: chunk = body.read(8192) if not chunk: break if isinstance(chunk, str): chunk = chunk.encode('utf-8') gz.write(chunk) compressed_obj.seek(0) return compressed_obj def _set_compression_header(headers, encoding): ce_header = headers.get('Content-Encoding') if ce_header is None: headers['Content-Encoding'] = encoding else: headers['Content-Encoding'] = f'{ce_header},{encoding}' COMPRESSION_MAPPING = {'gzip': _gzip_compress_body}