# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import base64 import datetime import json import weakref import botocore import botocore.auth from botocore.awsrequest import create_request_object, prepare_request_dict from botocore.compat import OrderedDict from botocore.exceptions import ( UnknownClientMethodError, UnknownSignatureVersionError, UnsupportedSignatureVersionError, ) from botocore.utils import ArnParser, datetime2timestamp # Keep these imported. There's pre-existing code that uses them. from botocore.utils import fix_s3_host # noqa class RequestSigner: """ An object to sign requests before they go out over the wire using one of the authentication mechanisms defined in ``auth.py``. This class fires two events scoped to a service and operation name: * choose-signer: Allows overriding the auth signer name. * before-sign: Allows mutating the request before signing. Together these events allow for customization of the request signing pipeline, including overrides, request path manipulation, and disabling signing per operation. :type service_id: botocore.model.ServiceId :param service_id: The service id for the service, e.g. ``S3`` :type region_name: string :param region_name: Name of the service region, e.g. ``us-east-1`` :type signing_name: string :param signing_name: Service signing name. This is usually the same as the service name, but can differ. E.g. ``emr`` vs. ``elasticmapreduce``. :type signature_version: string :param signature_version: Signature name like ``v4``. :type credentials: :py:class:`~botocore.credentials.Credentials` :param credentials: User credentials with which to sign requests. :type event_emitter: :py:class:`~botocore.hooks.BaseEventHooks` :param event_emitter: Extension mechanism to fire events. """ def __init__( self, service_id, region_name, signing_name, signature_version, credentials, event_emitter, auth_token=None, ): self._region_name = region_name self._signing_name = signing_name self._signature_version = signature_version self._credentials = credentials self._auth_token = auth_token self._service_id = service_id # We need weakref to prevent leaking memory in Python 2.6 on Linux 2.6 self._event_emitter = weakref.proxy(event_emitter) @property def region_name(self): return self._region_name @property def signature_version(self): return self._signature_version @property def signing_name(self): return self._signing_name def handler(self, operation_name=None, request=None, **kwargs): # This is typically hooked up to the "request-created" event # from a client's event emitter. When a new request is created # this method is invoked to sign the request. # Don't call this method directly. return self.sign(operation_name, request) def sign( self, operation_name, request, region_name=None, signing_type='standard', expires_in=None, signing_name=None, ): """Sign a request before it goes out over the wire. :type operation_name: string :param operation_name: The name of the current operation, e.g. ``ListBuckets``. :type request: AWSRequest :param request: The request object to be sent over the wire. :type region_name: str :param region_name: The region to sign the request for. :type signing_type: str :param signing_type: The type of signing to perform. This can be one of three possible values: * 'standard' - This should be used for most requests. * 'presign-url' - This should be used when pre-signing a request. * 'presign-post' - This should be used when pre-signing an S3 post. :type expires_in: int :param expires_in: The number of seconds the presigned url is valid for. This parameter is only valid for signing type 'presign-url'. :type signing_name: str :param signing_name: The name to use for the service when signing. """ explicit_region_name = region_name if region_name is None: region_name = self._region_name if signing_name is None: signing_name = self._signing_name signature_version = self._choose_signer( operation_name, signing_type, request.context ) # Allow mutating request before signing self._event_emitter.emit( 'before-sign.{}.{}'.format( self._service_id.hyphenize(), operation_name ), request=request, signing_name=signing_name, region_name=self._region_name, signature_version=signature_version, request_signer=self, operation_name=operation_name, ) if signature_version != botocore.UNSIGNED: kwargs = { 'signing_name': signing_name, 'region_name': region_name, 'signature_version': signature_version, } if expires_in is not None: kwargs['expires'] = expires_in signing_context = request.context.get('signing', {}) if not explicit_region_name and signing_context.get('region'): kwargs['region_name'] = signing_context['region'] if signing_context.get('signing_name'): kwargs['signing_name'] = signing_context['signing_name'] if signing_context.get('identity_cache') is not None: self._resolve_identity_cache( kwargs, signing_context['identity_cache'], signing_context['cache_key'], ) try: auth = self.get_auth_instance(**kwargs) except UnknownSignatureVersionError as e: if signing_type != 'standard': raise UnsupportedSignatureVersionError( signature_version=signature_version ) else: raise e auth.add_auth(request) def _resolve_identity_cache(self, kwargs, cache, cache_key): kwargs['identity_cache'] = cache kwargs['cache_key'] = cache_key def _choose_signer(self, operation_name, signing_type, context): """ Allow setting the signature version via the choose-signer event. A value of `botocore.UNSIGNED` means no signing will be performed. :param operation_name: The operation to sign. :param signing_type: The type of signing that the signer is to be used for. :return: The signature version to sign with. """ signing_type_suffix_map = { 'presign-post': '-presign-post', 'presign-url': '-query', } suffix = signing_type_suffix_map.get(signing_type, '') # operation specific signing context takes precedent over client-level # defaults signature_version = context.get('auth_type') or self._signature_version signing = context.get('signing', {}) signing_name = signing.get('signing_name', self._signing_name) region_name = signing.get('region', self._region_name) if ( signature_version is not botocore.UNSIGNED and not signature_version.endswith(suffix) ): signature_version += suffix handler, response = self._event_emitter.emit_until_response( 'choose-signer.{}.{}'.format( self._service_id.hyphenize(), operation_name ), signing_name=signing_name, region_name=region_name, signature_version=signature_version, context=context, ) if response is not None: signature_version = response # The suffix needs to be checked again in case we get an improper # signature version from choose-signer. if ( signature_version is not botocore.UNSIGNED and not signature_version.endswith(suffix) ): signature_version += suffix return signature_version def get_auth_instance( self, signing_name, region_name, signature_version=None, **kwargs ): """ Get an auth instance which can be used to sign a request using the given signature version. :type signing_name: string :param signing_name: Service signing name. This is usually the same as the service name, but can differ. E.g. ``emr`` vs. ``elasticmapreduce``. :type region_name: string :param region_name: Name of the service region, e.g. ``us-east-1`` :type signature_version: string :param signature_version: Signature name like ``v4``. :rtype: :py:class:`~botocore.auth.BaseSigner` :return: Auth instance to sign a request. """ if signature_version is None: signature_version = self._signature_version cls = botocore.auth.AUTH_TYPE_MAPS.get(signature_version) if cls is None: raise UnknownSignatureVersionError( signature_version=signature_version ) if cls.REQUIRES_TOKEN is True: frozen_token = None if self._auth_token is not None: frozen_token = self._auth_token.get_frozen_token() auth = cls(frozen_token) return auth credentials = self._credentials if getattr(cls, "REQUIRES_IDENTITY_CACHE", None) is True: cache = kwargs["identity_cache"] key = kwargs["cache_key"] credentials = cache.get_credentials(key) del kwargs["cache_key"] # If there's no credentials provided (i.e credentials is None), # then we'll pass a value of "None" over to the auth classes, # which already handle the cases where no credentials have # been provided. frozen_credentials = None if credentials is not None: frozen_credentials = credentials.get_frozen_credentials() kwargs['credentials'] = frozen_credentials if cls.REQUIRES_REGION: if self._region_name is None: raise botocore.exceptions.NoRegionError() kwargs['region_name'] = region_name kwargs['service_name'] = signing_name auth = cls(**kwargs) return auth # Alias get_auth for backwards compatibility. get_auth = get_auth_instance def generate_presigned_url( self, request_dict, operation_name, expires_in=3600, region_name=None, signing_name=None, ): """Generates a presigned url :type request_dict: dict :param request_dict: The prepared request dictionary returned by ``botocore.awsrequest.prepare_request_dict()`` :type operation_name: str :param operation_name: The operation being signed. :type expires_in: int :param expires_in: The number of seconds the presigned url is valid for. By default it expires in an hour (3600 seconds) :type region_name: string :param region_name: The region name to sign the presigned url. :type signing_name: str :param signing_name: The name to use for the service when signing. :returns: The presigned url """ request = create_request_object(request_dict) self.sign( operation_name, request, region_name, 'presign-url', expires_in, signing_name, ) request.prepare() return request.url class CloudFrontSigner: '''A signer to create a signed CloudFront URL. First you create a cloudfront signer based on a normalized RSA signer:: import rsa def rsa_signer(message): private_key = open('private_key.pem', 'r').read() return rsa.sign( message, rsa.PrivateKey.load_pkcs1(private_key.encode('utf8')), 'SHA-1') # CloudFront requires SHA-1 hash cf_signer = CloudFrontSigner(key_id, rsa_signer) To sign with a canned policy:: signed_url = cf_signer.generate_signed_url( url, date_less_than=datetime(2015, 12, 1)) To sign with a custom policy:: signed_url = cf_signer.generate_signed_url(url, policy=my_policy) ''' def __init__(self, key_id, rsa_signer): """Create a CloudFrontSigner. :type key_id: str :param key_id: The CloudFront Key Pair ID :type rsa_signer: callable :param rsa_signer: An RSA signer. Its only input parameter will be the message to be signed, and its output will be the signed content as a binary string. The hash algorithm needed by CloudFront is SHA-1. """ self.key_id = key_id self.rsa_signer = rsa_signer def generate_presigned_url(self, url, date_less_than=None, policy=None): """Creates a signed CloudFront URL based on given parameters. :type url: str :param url: The URL of the protected object :type date_less_than: datetime :param date_less_than: The URL will expire after that date and time :type policy: str :param policy: The custom policy, possibly built by self.build_policy() :rtype: str :return: The signed URL. """ both_args_supplied = date_less_than is not None and policy is not None neither_arg_supplied = date_less_than is None and policy is None if both_args_supplied or neither_arg_supplied: e = 'Need to provide either date_less_than or policy, but not both' raise ValueError(e) if date_less_than is not None: # We still need to build a canned policy for signing purpose policy = self.build_policy(url, date_less_than) if isinstance(policy, str): policy = policy.encode('utf8') if date_less_than is not None: params = ['Expires=%s' % int(datetime2timestamp(date_less_than))] else: params = ['Policy=%s' % self._url_b64encode(policy).decode('utf8')] signature = self.rsa_signer(policy) params.extend( [ f"Signature={self._url_b64encode(signature).decode('utf8')}", f"Key-Pair-Id={self.key_id}", ] ) return self._build_url(url, params) def _build_url(self, base_url, extra_params): separator = '&' if '?' in base_url else '?' return base_url + separator + '&'.join(extra_params) def build_policy( self, resource, date_less_than, date_greater_than=None, ip_address=None ): """A helper to build policy. :type resource: str :param resource: The URL or the stream filename of the protected object :type date_less_than: datetime :param date_less_than: The URL will expire after the time has passed :type date_greater_than: datetime :param date_greater_than: The URL will not be valid until this time :type ip_address: str :param ip_address: Use 'x.x.x.x' for an IP, or 'x.x.x.x/x' for a subnet :rtype: str :return: The policy in a compact string. """ # Note: # 1. Order in canned policy is significant. Special care has been taken # to ensure the output will match the order defined by the document. # There is also a test case to ensure that order. # SEE: http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-canned-policy.html#private-content-canned-policy-creating-policy-statement # 2. Albeit the order in custom policy is not required by CloudFront, # we still use OrderedDict internally to ensure the result is stable # and also matches canned policy requirement. # SEE: http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-creating-signed-url-custom-policy.html moment = int(datetime2timestamp(date_less_than)) condition = OrderedDict({"DateLessThan": {"AWS:EpochTime": moment}}) if ip_address: if '/' not in ip_address: ip_address += '/32' condition["IpAddress"] = {"AWS:SourceIp": ip_address} if date_greater_than: moment = int(datetime2timestamp(date_greater_than)) condition["DateGreaterThan"] = {"AWS:EpochTime": moment} ordered_payload = [('Resource', resource), ('Condition', condition)] custom_policy = {"Statement": [OrderedDict(ordered_payload)]} return json.dumps(custom_policy, separators=(',', ':')) def _url_b64encode(self, data): # Required by CloudFront. See also: # http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-linux-openssl.html return ( base64.b64encode(data) .replace(b'+', b'-') .replace(b'=', b'_') .replace(b'/', b'~') ) def add_generate_db_auth_token(class_attributes, **kwargs): class_attributes['generate_db_auth_token'] = generate_db_auth_token def generate_db_auth_token(self, DBHostname, Port, DBUsername, Region=None): """Generates an auth token used to connect to a db with IAM credentials. :type DBHostname: str :param DBHostname: The hostname of the database to connect to. :type Port: int :param Port: The port number the database is listening on. :type DBUsername: str :param DBUsername: The username to log in as. :type Region: str :param Region: The region the database is in. If None, the client region will be used. :return: A presigned url which can be used as an auth token. """ region = Region if region is None: region = self.meta.region_name params = { 'Action': 'connect', 'DBUser': DBUsername, } request_dict = { 'url_path': '/', 'query_string': '', 'headers': {}, 'body': params, 'method': 'GET', } # RDS requires that the scheme not be set when sent over. This can cause # issues when signing because the Python url parsing libraries follow # RFC 1808 closely, which states that a netloc must be introduced by `//`. # Otherwise the url is presumed to be relative, and thus the whole # netloc would be treated as a path component. To work around this we # introduce https here and remove it once we're done processing it. scheme = 'https://' endpoint_url = f'{scheme}{DBHostname}:{Port}' prepare_request_dict(request_dict, endpoint_url) presigned_url = self._request_signer.generate_presigned_url( operation_name='connect', request_dict=request_dict, region_name=region, expires_in=900, signing_name='rds-db', ) return presigned_url[len(scheme) :] class S3PostPresigner: def __init__(self, request_signer): self._request_signer = request_signer def generate_presigned_post( self, request_dict, fields=None, conditions=None, expires_in=3600, region_name=None, ): """Generates the url and the form fields used for a presigned s3 post :type request_dict: dict :param request_dict: The prepared request dictionary returned by ``botocore.awsrequest.prepare_request_dict()`` :type fields: dict :param fields: A dictionary of prefilled form fields to build on top of. :type conditions: list :param conditions: A list of conditions to include in the policy. Each element can be either a list or a structure. For example: [ {"acl": "public-read"}, {"bucket": "mybucket"}, ["starts-with", "$key", "mykey"] ] :type expires_in: int :param expires_in: The number of seconds the presigned post is valid for. :type region_name: string :param region_name: The region name to sign the presigned post to. :rtype: dict :returns: A dictionary with two elements: ``url`` and ``fields``. Url is the url to post to. Fields is a dictionary filled with the form fields and respective values to use when submitting the post. For example: {'url': 'https://mybucket.s3.amazonaws.com 'fields': {'acl': 'public-read', 'key': 'mykey', 'signature': 'mysignature', 'policy': 'mybase64 encoded policy'} } """ if fields is None: fields = {} if conditions is None: conditions = [] # Create the policy for the post. policy = {} # Create an expiration date for the policy datetime_now = datetime.datetime.utcnow() expire_date = datetime_now + datetime.timedelta(seconds=expires_in) policy['expiration'] = expire_date.strftime(botocore.auth.ISO8601) # Append all of the conditions that the user supplied. policy['conditions'] = [] for condition in conditions: policy['conditions'].append(condition) # Store the policy and the fields in the request for signing request = create_request_object(request_dict) request.context['s3-presign-post-fields'] = fields request.context['s3-presign-post-policy'] = policy self._request_signer.sign( 'PutObject', request, region_name, 'presign-post' ) # Return the url and the fields for th form to post. return {'url': request.url, 'fields': fields} def add_generate_presigned_url(class_attributes, **kwargs): class_attributes['generate_presigned_url'] = generate_presigned_url def generate_presigned_url( self, ClientMethod, Params=None, ExpiresIn=3600, HttpMethod=None ): """Generate a presigned url given a client, its method, and arguments :type ClientMethod: string :param ClientMethod: The client method to presign for :type Params: dict :param Params: The parameters normally passed to ``ClientMethod``. :type ExpiresIn: int :param ExpiresIn: The number of seconds the presigned url is valid for. By default it expires in an hour (3600 seconds) :type HttpMethod: string :param HttpMethod: The http method to use on the generated url. By default, the http method is whatever is used in the method's model. :returns: The presigned url """ client_method = ClientMethod params = Params if params is None: params = {} expires_in = ExpiresIn http_method = HttpMethod context = { 'is_presign_request': True, 'use_global_endpoint': _should_use_global_endpoint(self), } request_signer = self._request_signer try: operation_name = self._PY_TO_OP_NAME[client_method] except KeyError: raise UnknownClientMethodError(method_name=client_method) operation_model = self.meta.service_model.operation_model(operation_name) params = self._emit_api_params( api_params=params, operation_model=operation_model, context=context, ) bucket_is_arn = ArnParser.is_arn(params.get('Bucket', '')) ( endpoint_url, additional_headers, properties, ) = self._resolve_endpoint_ruleset( operation_model, params, context, ignore_signing_region=(not bucket_is_arn), ) request_dict = self._convert_to_request_dict( api_params=params, operation_model=operation_model, endpoint_url=endpoint_url, context=context, headers=additional_headers, set_user_agent_header=False, ) # Switch out the http method if user specified it. if http_method is not None: request_dict['method'] = http_method # Generate the presigned url. return request_signer.generate_presigned_url( request_dict=request_dict, expires_in=expires_in, operation_name=operation_name, ) def add_generate_presigned_post(class_attributes, **kwargs): class_attributes['generate_presigned_post'] = generate_presigned_post def generate_presigned_post( self, Bucket, Key, Fields=None, Conditions=None, ExpiresIn=3600 ): """Builds the url and the form fields used for a presigned s3 post :type Bucket: string :param Bucket: The name of the bucket to presign the post to. Note that bucket related conditions should not be included in the ``conditions`` parameter. :type Key: string :param Key: Key name, optionally add ${filename} to the end to attach the submitted filename. Note that key related conditions and fields are filled out for you and should not be included in the ``Fields`` or ``Conditions`` parameter. :type Fields: dict :param Fields: A dictionary of prefilled form fields to build on top of. Elements that may be included are acl, Cache-Control, Content-Type, Content-Disposition, Content-Encoding, Expires, success_action_redirect, redirect, success_action_status, and x-amz-meta-. Note that if a particular element is included in the fields dictionary it will not be automatically added to the conditions list. You must specify a condition for the element as well. :type Conditions: list :param Conditions: A list of conditions to include in the policy. Each element can be either a list or a structure. For example: [ {"acl": "public-read"}, ["content-length-range", 2, 5], ["starts-with", "$success_action_redirect", ""] ] Conditions that are included may pertain to acl, content-length-range, Cache-Control, Content-Type, Content-Disposition, Content-Encoding, Expires, success_action_redirect, redirect, success_action_status, and/or x-amz-meta-. Note that if you include a condition, you must specify the a valid value in the fields dictionary as well. A value will not be added automatically to the fields dictionary based on the conditions. :type ExpiresIn: int :param ExpiresIn: The number of seconds the presigned post is valid for. :rtype: dict :returns: A dictionary with two elements: ``url`` and ``fields``. Url is the url to post to. Fields is a dictionary filled with the form fields and respective values to use when submitting the post. For example: {'url': 'https://mybucket.s3.amazonaws.com 'fields': {'acl': 'public-read', 'key': 'mykey', 'signature': 'mysignature', 'policy': 'mybase64 encoded policy'} } """ bucket = Bucket key = Key fields = Fields conditions = Conditions expires_in = ExpiresIn if fields is None: fields = {} else: fields = fields.copy() if conditions is None: conditions = [] context = { 'is_presign_request': True, 'use_global_endpoint': _should_use_global_endpoint(self), } post_presigner = S3PostPresigner(self._request_signer) # We choose the CreateBucket operation model because its url gets # serialized to what a presign post requires. operation_model = self.meta.service_model.operation_model('CreateBucket') params = self._emit_api_params( api_params={'Bucket': bucket}, operation_model=operation_model, context=context, ) bucket_is_arn = ArnParser.is_arn(params.get('Bucket', '')) ( endpoint_url, additional_headers, properties, ) = self._resolve_endpoint_ruleset( operation_model, params, context, ignore_signing_region=(not bucket_is_arn), ) request_dict = self._convert_to_request_dict( api_params=params, operation_model=operation_model, endpoint_url=endpoint_url, context=context, headers=additional_headers, set_user_agent_header=False, ) # Append that the bucket name to the list of conditions. conditions.append({'bucket': bucket}) # If the key ends with filename, the only constraint that can be # imposed is if it starts with the specified prefix. if key.endswith('${filename}'): conditions.append(["starts-with", '$key', key[: -len('${filename}')]]) else: conditions.append({'key': key}) # Add the key to the fields. fields['key'] = key return post_presigner.generate_presigned_post( request_dict=request_dict, fields=fields, conditions=conditions, expires_in=expires_in, ) def _should_use_global_endpoint(client): if client.meta.partition != 'aws': return False s3_config = client.meta.config.s3 if s3_config: if s3_config.get('use_dualstack_endpoint', False): return False if ( s3_config.get('us_east_1_regional_endpoint') == 'regional' and client.meta.config.region_name == 'us-east-1' ): return False if s3_config.get('addressing_style') == 'virtual': return False return True