You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1375 lines
50 KiB
1375 lines
50 KiB
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"). You
|
|
# may not use this file except in compliance with the License. A copy of
|
|
# the License is located at
|
|
#
|
|
# http://aws.amazon.com/apache2.0/
|
|
#
|
|
# or in the "license" file accompanying this file. This file is
|
|
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
|
|
# ANY KIND, either express or implied. See the License for the specific
|
|
# language governing permissions and limitations under the License.
|
|
import logging
|
|
|
|
from botocore import waiter, xform_name
|
|
from botocore.args import ClientArgsCreator
|
|
from botocore.auth import AUTH_TYPE_MAPS
|
|
from botocore.awsrequest import prepare_request_dict
|
|
from botocore.compress import maybe_compress_request
|
|
from botocore.config import Config
|
|
from botocore.credentials import RefreshableCredentials
|
|
from botocore.discovery import (
|
|
EndpointDiscoveryHandler,
|
|
EndpointDiscoveryManager,
|
|
block_endpoint_discovery_required_operations,
|
|
)
|
|
from botocore.docs.docstring import ClientMethodDocstring, PaginatorDocstring
|
|
from botocore.exceptions import (
|
|
DataNotFoundError,
|
|
InvalidEndpointDiscoveryConfigurationError,
|
|
OperationNotPageableError,
|
|
UnknownServiceError,
|
|
UnknownSignatureVersionError,
|
|
)
|
|
from botocore.history import get_global_history_recorder
|
|
from botocore.hooks import first_non_none_response
|
|
from botocore.httpchecksum import (
|
|
apply_request_checksum,
|
|
resolve_checksum_context,
|
|
)
|
|
from botocore.model import ServiceModel
|
|
from botocore.paginate import Paginator
|
|
from botocore.retries import adaptive, standard
|
|
from botocore.useragent import UserAgentString
|
|
from botocore.utils import (
|
|
CachedProperty,
|
|
EventbridgeSignerSetter,
|
|
S3ControlArnParamHandlerv2,
|
|
S3ExpressIdentityResolver,
|
|
S3RegionRedirectorv2,
|
|
ensure_boolean,
|
|
get_service_module_name,
|
|
)
|
|
|
|
# Keep these imported. There's pre-existing code that uses:
|
|
# "from botocore.client import UNSIGNED"
|
|
# "from botocore.client import ClientError"
|
|
# etc.
|
|
from botocore.exceptions import ClientError # noqa
|
|
from botocore.utils import S3ArnParamHandler # noqa
|
|
from botocore.utils import S3ControlArnParamHandler # noqa
|
|
from botocore.utils import S3ControlEndpointSetter # noqa
|
|
from botocore.utils import S3EndpointSetter # noqa
|
|
from botocore.utils import S3RegionRedirector # noqa
|
|
from botocore import UNSIGNED # noqa
|
|
|
|
|
|
_LEGACY_SIGNATURE_VERSIONS = frozenset(
|
|
(
|
|
'v2',
|
|
'v3',
|
|
'v3https',
|
|
'v4',
|
|
's3',
|
|
's3v4',
|
|
)
|
|
)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
history_recorder = get_global_history_recorder()
|
|
|
|
|
|
class ClientCreator:
|
|
"""Creates client objects for a service."""
|
|
|
|
def __init__(
|
|
self,
|
|
loader,
|
|
endpoint_resolver,
|
|
user_agent,
|
|
event_emitter,
|
|
retry_handler_factory,
|
|
retry_config_translator,
|
|
response_parser_factory=None,
|
|
exceptions_factory=None,
|
|
config_store=None,
|
|
user_agent_creator=None,
|
|
):
|
|
self._loader = loader
|
|
self._endpoint_resolver = endpoint_resolver
|
|
self._user_agent = user_agent
|
|
self._event_emitter = event_emitter
|
|
self._retry_handler_factory = retry_handler_factory
|
|
self._retry_config_translator = retry_config_translator
|
|
self._response_parser_factory = response_parser_factory
|
|
self._exceptions_factory = exceptions_factory
|
|
# TODO: Migrate things away from scoped_config in favor of the
|
|
# config_store. The config store can pull things from both the scoped
|
|
# config and environment variables (and potentially more in the
|
|
# future).
|
|
self._config_store = config_store
|
|
self._user_agent_creator = user_agent_creator
|
|
|
|
def create_client(
|
|
self,
|
|
service_name,
|
|
region_name,
|
|
is_secure=True,
|
|
endpoint_url=None,
|
|
verify=None,
|
|
credentials=None,
|
|
scoped_config=None,
|
|
api_version=None,
|
|
client_config=None,
|
|
auth_token=None,
|
|
):
|
|
responses = self._event_emitter.emit(
|
|
'choose-service-name', service_name=service_name
|
|
)
|
|
service_name = first_non_none_response(responses, default=service_name)
|
|
service_model = self._load_service_model(service_name, api_version)
|
|
try:
|
|
endpoints_ruleset_data = self._load_service_endpoints_ruleset(
|
|
service_name, api_version
|
|
)
|
|
partition_data = self._loader.load_data('partitions')
|
|
except UnknownServiceError:
|
|
endpoints_ruleset_data = None
|
|
partition_data = None
|
|
logger.info(
|
|
'No endpoints ruleset found for service %s, falling back to '
|
|
'legacy endpoint routing.',
|
|
service_name,
|
|
)
|
|
|
|
cls = self._create_client_class(service_name, service_model)
|
|
region_name, client_config = self._normalize_fips_region(
|
|
region_name, client_config
|
|
)
|
|
endpoint_bridge = ClientEndpointBridge(
|
|
self._endpoint_resolver,
|
|
scoped_config,
|
|
client_config,
|
|
service_signing_name=service_model.metadata.get('signingName'),
|
|
config_store=self._config_store,
|
|
service_signature_version=service_model.metadata.get(
|
|
'signatureVersion'
|
|
),
|
|
)
|
|
client_args = self._get_client_args(
|
|
service_model,
|
|
region_name,
|
|
is_secure,
|
|
endpoint_url,
|
|
verify,
|
|
credentials,
|
|
scoped_config,
|
|
client_config,
|
|
endpoint_bridge,
|
|
auth_token,
|
|
endpoints_ruleset_data,
|
|
partition_data,
|
|
)
|
|
service_client = cls(**client_args)
|
|
self._register_retries(service_client)
|
|
self._register_s3_events(
|
|
client=service_client,
|
|
endpoint_bridge=None,
|
|
endpoint_url=None,
|
|
client_config=client_config,
|
|
scoped_config=scoped_config,
|
|
)
|
|
self._register_s3express_events(client=service_client)
|
|
self._register_s3_control_events(client=service_client)
|
|
self._register_endpoint_discovery(
|
|
service_client, endpoint_url, client_config
|
|
)
|
|
return service_client
|
|
|
|
def create_client_class(self, service_name, api_version=None):
|
|
service_model = self._load_service_model(service_name, api_version)
|
|
return self._create_client_class(service_name, service_model)
|
|
|
|
def _create_client_class(self, service_name, service_model):
|
|
class_attributes = self._create_methods(service_model)
|
|
py_name_to_operation_name = self._create_name_mapping(service_model)
|
|
class_attributes['_PY_TO_OP_NAME'] = py_name_to_operation_name
|
|
bases = [BaseClient]
|
|
service_id = service_model.service_id.hyphenize()
|
|
self._event_emitter.emit(
|
|
'creating-client-class.%s' % service_id,
|
|
class_attributes=class_attributes,
|
|
base_classes=bases,
|
|
)
|
|
class_name = get_service_module_name(service_model)
|
|
cls = type(str(class_name), tuple(bases), class_attributes)
|
|
return cls
|
|
|
|
def _normalize_fips_region(self, region_name, client_config):
|
|
if region_name is not None:
|
|
normalized_region_name = region_name.replace('fips-', '').replace(
|
|
'-fips', ''
|
|
)
|
|
# If region has been transformed then set flag
|
|
if normalized_region_name != region_name:
|
|
config_use_fips_endpoint = Config(use_fips_endpoint=True)
|
|
if client_config:
|
|
# Keeping endpoint setting client specific
|
|
client_config = client_config.merge(
|
|
config_use_fips_endpoint
|
|
)
|
|
else:
|
|
client_config = config_use_fips_endpoint
|
|
logger.warning(
|
|
'transforming region from %s to %s and setting '
|
|
'use_fips_endpoint to true. client should not '
|
|
'be configured with a fips psuedo region.'
|
|
% (region_name, normalized_region_name)
|
|
)
|
|
region_name = normalized_region_name
|
|
return region_name, client_config
|
|
|
|
def _load_service_model(self, service_name, api_version=None):
|
|
json_model = self._loader.load_service_model(
|
|
service_name, 'service-2', api_version=api_version
|
|
)
|
|
service_model = ServiceModel(json_model, service_name=service_name)
|
|
return service_model
|
|
|
|
def _load_service_endpoints_ruleset(self, service_name, api_version=None):
|
|
return self._loader.load_service_model(
|
|
service_name, 'endpoint-rule-set-1', api_version=api_version
|
|
)
|
|
|
|
def _register_retries(self, client):
|
|
retry_mode = client.meta.config.retries['mode']
|
|
if retry_mode == 'standard':
|
|
self._register_v2_standard_retries(client)
|
|
elif retry_mode == 'adaptive':
|
|
self._register_v2_standard_retries(client)
|
|
self._register_v2_adaptive_retries(client)
|
|
elif retry_mode == 'legacy':
|
|
self._register_legacy_retries(client)
|
|
|
|
def _register_v2_standard_retries(self, client):
|
|
max_attempts = client.meta.config.retries.get('total_max_attempts')
|
|
kwargs = {'client': client}
|
|
if max_attempts is not None:
|
|
kwargs['max_attempts'] = max_attempts
|
|
standard.register_retry_handler(**kwargs)
|
|
|
|
def _register_v2_adaptive_retries(self, client):
|
|
adaptive.register_retry_handler(client)
|
|
|
|
def _register_legacy_retries(self, client):
|
|
endpoint_prefix = client.meta.service_model.endpoint_prefix
|
|
service_id = client.meta.service_model.service_id
|
|
service_event_name = service_id.hyphenize()
|
|
|
|
# First, we load the entire retry config for all services,
|
|
# then pull out just the information we need.
|
|
original_config = self._loader.load_data('_retry')
|
|
if not original_config:
|
|
return
|
|
|
|
retries = self._transform_legacy_retries(client.meta.config.retries)
|
|
retry_config = self._retry_config_translator.build_retry_config(
|
|
endpoint_prefix,
|
|
original_config.get('retry', {}),
|
|
original_config.get('definitions', {}),
|
|
retries,
|
|
)
|
|
|
|
logger.debug(
|
|
"Registering retry handlers for service: %s",
|
|
client.meta.service_model.service_name,
|
|
)
|
|
handler = self._retry_handler_factory.create_retry_handler(
|
|
retry_config, endpoint_prefix
|
|
)
|
|
unique_id = 'retry-config-%s' % service_event_name
|
|
client.meta.events.register(
|
|
f"needs-retry.{service_event_name}", handler, unique_id=unique_id
|
|
)
|
|
|
|
def _transform_legacy_retries(self, retries):
|
|
if retries is None:
|
|
return
|
|
copied_args = retries.copy()
|
|
if 'total_max_attempts' in retries:
|
|
copied_args = retries.copy()
|
|
copied_args['max_attempts'] = (
|
|
copied_args.pop('total_max_attempts') - 1
|
|
)
|
|
return copied_args
|
|
|
|
def _get_retry_mode(self, client, config_store):
|
|
client_retries = client.meta.config.retries
|
|
if (
|
|
client_retries is not None
|
|
and client_retries.get('mode') is not None
|
|
):
|
|
return client_retries['mode']
|
|
return config_store.get_config_variable('retry_mode') or 'legacy'
|
|
|
|
def _register_endpoint_discovery(self, client, endpoint_url, config):
|
|
if endpoint_url is not None:
|
|
# Don't register any handlers in the case of a custom endpoint url
|
|
return
|
|
# Only attach handlers if the service supports discovery
|
|
if client.meta.service_model.endpoint_discovery_operation is None:
|
|
return
|
|
events = client.meta.events
|
|
service_id = client.meta.service_model.service_id.hyphenize()
|
|
enabled = False
|
|
if config and config.endpoint_discovery_enabled is not None:
|
|
enabled = config.endpoint_discovery_enabled
|
|
elif self._config_store:
|
|
enabled = self._config_store.get_config_variable(
|
|
'endpoint_discovery_enabled'
|
|
)
|
|
|
|
enabled = self._normalize_endpoint_discovery_config(enabled)
|
|
if enabled and self._requires_endpoint_discovery(client, enabled):
|
|
discover = enabled is True
|
|
manager = EndpointDiscoveryManager(
|
|
client, always_discover=discover
|
|
)
|
|
handler = EndpointDiscoveryHandler(manager)
|
|
handler.register(events, service_id)
|
|
else:
|
|
events.register(
|
|
'before-parameter-build',
|
|
block_endpoint_discovery_required_operations,
|
|
)
|
|
|
|
def _normalize_endpoint_discovery_config(self, enabled):
|
|
"""Config must either be a boolean-string or string-literal 'auto'"""
|
|
if isinstance(enabled, str):
|
|
enabled = enabled.lower().strip()
|
|
if enabled == 'auto':
|
|
return enabled
|
|
elif enabled in ('true', 'false'):
|
|
return ensure_boolean(enabled)
|
|
elif isinstance(enabled, bool):
|
|
return enabled
|
|
|
|
raise InvalidEndpointDiscoveryConfigurationError(config_value=enabled)
|
|
|
|
def _requires_endpoint_discovery(self, client, enabled):
|
|
if enabled == "auto":
|
|
return client.meta.service_model.endpoint_discovery_required
|
|
return enabled
|
|
|
|
def _register_eventbridge_events(
|
|
self, client, endpoint_bridge, endpoint_url
|
|
):
|
|
if client.meta.service_model.service_name != 'events':
|
|
return
|
|
EventbridgeSignerSetter(
|
|
endpoint_resolver=self._endpoint_resolver,
|
|
region=client.meta.region_name,
|
|
endpoint_url=endpoint_url,
|
|
).register(client.meta.events)
|
|
|
|
def _register_s3express_events(
|
|
self,
|
|
client,
|
|
endpoint_bridge=None,
|
|
endpoint_url=None,
|
|
client_config=None,
|
|
scoped_config=None,
|
|
):
|
|
if client.meta.service_model.service_name != 's3':
|
|
return
|
|
S3ExpressIdentityResolver(client, RefreshableCredentials).register()
|
|
|
|
def _register_s3_events(
|
|
self,
|
|
client,
|
|
endpoint_bridge,
|
|
endpoint_url,
|
|
client_config,
|
|
scoped_config,
|
|
):
|
|
if client.meta.service_model.service_name != 's3':
|
|
return
|
|
S3RegionRedirectorv2(None, client).register()
|
|
self._set_s3_presign_signature_version(
|
|
client.meta, client_config, scoped_config
|
|
)
|
|
|
|
def _register_s3_control_events(
|
|
self,
|
|
client,
|
|
endpoint_bridge=None,
|
|
endpoint_url=None,
|
|
client_config=None,
|
|
scoped_config=None,
|
|
):
|
|
if client.meta.service_model.service_name != 's3control':
|
|
return
|
|
S3ControlArnParamHandlerv2().register(client.meta.events)
|
|
|
|
def _set_s3_presign_signature_version(
|
|
self, client_meta, client_config, scoped_config
|
|
):
|
|
# This will return the manually configured signature version, or None
|
|
# if none was manually set. If a customer manually sets the signature
|
|
# version, we always want to use what they set.
|
|
provided_signature_version = _get_configured_signature_version(
|
|
's3', client_config, scoped_config
|
|
)
|
|
if provided_signature_version is not None:
|
|
return
|
|
|
|
# Check to see if the region is a region that we know about. If we
|
|
# don't know about a region, then we can safely assume it's a new
|
|
# region that is sigv4 only, since all new S3 regions only allow sigv4.
|
|
# The only exception is aws-global. This is a pseudo-region for the
|
|
# global endpoint, we should respect the signature versions it
|
|
# supports, which includes v2.
|
|
regions = self._endpoint_resolver.get_available_endpoints(
|
|
's3', client_meta.partition
|
|
)
|
|
if (
|
|
client_meta.region_name != 'aws-global'
|
|
and client_meta.region_name not in regions
|
|
):
|
|
return
|
|
|
|
# If it is a region we know about, we want to default to sigv2, so here
|
|
# we check to see if it is available.
|
|
endpoint = self._endpoint_resolver.construct_endpoint(
|
|
's3', client_meta.region_name
|
|
)
|
|
signature_versions = endpoint['signatureVersions']
|
|
if 's3' not in signature_versions:
|
|
return
|
|
|
|
# We now know that we're in a known region that supports sigv2 and
|
|
# the customer hasn't set a signature version so we default the
|
|
# signature version to sigv2.
|
|
client_meta.events.register(
|
|
'choose-signer.s3', self._default_s3_presign_to_sigv2
|
|
)
|
|
|
|
def _default_s3_presign_to_sigv2(self, signature_version, **kwargs):
|
|
"""
|
|
Returns the 's3' (sigv2) signer if presigning an s3 request. This is
|
|
intended to be used to set the default signature version for the signer
|
|
to sigv2. Situations where an asymmetric signature is required are the
|
|
exception, for example MRAP needs v4a.
|
|
|
|
:type signature_version: str
|
|
:param signature_version: The current client signature version.
|
|
|
|
:type signing_name: str
|
|
:param signing_name: The signing name of the service.
|
|
|
|
:return: 's3' if the request is an s3 presign request, None otherwise
|
|
"""
|
|
if signature_version.startswith('v4a'):
|
|
return
|
|
|
|
if signature_version.startswith('v4-s3express'):
|
|
return f'{signature_version}'
|
|
|
|
for suffix in ['-query', '-presign-post']:
|
|
if signature_version.endswith(suffix):
|
|
return f's3{suffix}'
|
|
|
|
def _get_client_args(
|
|
self,
|
|
service_model,
|
|
region_name,
|
|
is_secure,
|
|
endpoint_url,
|
|
verify,
|
|
credentials,
|
|
scoped_config,
|
|
client_config,
|
|
endpoint_bridge,
|
|
auth_token,
|
|
endpoints_ruleset_data,
|
|
partition_data,
|
|
):
|
|
args_creator = ClientArgsCreator(
|
|
self._event_emitter,
|
|
self._user_agent,
|
|
self._response_parser_factory,
|
|
self._loader,
|
|
self._exceptions_factory,
|
|
config_store=self._config_store,
|
|
user_agent_creator=self._user_agent_creator,
|
|
)
|
|
return args_creator.get_client_args(
|
|
service_model,
|
|
region_name,
|
|
is_secure,
|
|
endpoint_url,
|
|
verify,
|
|
credentials,
|
|
scoped_config,
|
|
client_config,
|
|
endpoint_bridge,
|
|
auth_token,
|
|
endpoints_ruleset_data,
|
|
partition_data,
|
|
)
|
|
|
|
def _create_methods(self, service_model):
|
|
op_dict = {}
|
|
for operation_name in service_model.operation_names:
|
|
py_operation_name = xform_name(operation_name)
|
|
op_dict[py_operation_name] = self._create_api_method(
|
|
py_operation_name, operation_name, service_model
|
|
)
|
|
return op_dict
|
|
|
|
def _create_name_mapping(self, service_model):
|
|
# py_name -> OperationName, for every operation available
|
|
# for a service.
|
|
mapping = {}
|
|
for operation_name in service_model.operation_names:
|
|
py_operation_name = xform_name(operation_name)
|
|
mapping[py_operation_name] = operation_name
|
|
return mapping
|
|
|
|
def _create_api_method(
|
|
self, py_operation_name, operation_name, service_model
|
|
):
|
|
def _api_call(self, *args, **kwargs):
|
|
# We're accepting *args so that we can give a more helpful
|
|
# error message than TypeError: _api_call takes exactly
|
|
# 1 argument.
|
|
if args:
|
|
raise TypeError(
|
|
f"{py_operation_name}() only accepts keyword arguments."
|
|
)
|
|
# The "self" in this scope is referring to the BaseClient.
|
|
return self._make_api_call(operation_name, kwargs)
|
|
|
|
_api_call.__name__ = str(py_operation_name)
|
|
|
|
# Add the docstring to the client method
|
|
operation_model = service_model.operation_model(operation_name)
|
|
docstring = ClientMethodDocstring(
|
|
operation_model=operation_model,
|
|
method_name=operation_name,
|
|
event_emitter=self._event_emitter,
|
|
method_description=operation_model.documentation,
|
|
example_prefix='response = client.%s' % py_operation_name,
|
|
include_signature=False,
|
|
)
|
|
_api_call.__doc__ = docstring
|
|
return _api_call
|
|
|
|
|
|
class ClientEndpointBridge:
|
|
"""Bridges endpoint data and client creation
|
|
|
|
This class handles taking out the relevant arguments from the endpoint
|
|
resolver and determining which values to use, taking into account any
|
|
client configuration options and scope configuration options.
|
|
|
|
This class also handles determining what, if any, region to use if no
|
|
explicit region setting is provided. For example, Amazon S3 client will
|
|
utilize "us-east-1" by default if no region can be resolved."""
|
|
|
|
DEFAULT_ENDPOINT = '{service}.{region}.amazonaws.com'
|
|
_DUALSTACK_CUSTOMIZED_SERVICES = ['s3', 's3-control']
|
|
|
|
def __init__(
|
|
self,
|
|
endpoint_resolver,
|
|
scoped_config=None,
|
|
client_config=None,
|
|
default_endpoint=None,
|
|
service_signing_name=None,
|
|
config_store=None,
|
|
service_signature_version=None,
|
|
):
|
|
self.service_signing_name = service_signing_name
|
|
self.endpoint_resolver = endpoint_resolver
|
|
self.scoped_config = scoped_config
|
|
self.client_config = client_config
|
|
self.default_endpoint = default_endpoint or self.DEFAULT_ENDPOINT
|
|
self.config_store = config_store
|
|
self.service_signature_version = service_signature_version
|
|
|
|
def resolve(
|
|
self, service_name, region_name=None, endpoint_url=None, is_secure=True
|
|
):
|
|
region_name = self._check_default_region(service_name, region_name)
|
|
use_dualstack_endpoint = self._resolve_use_dualstack_endpoint(
|
|
service_name
|
|
)
|
|
use_fips_endpoint = self._resolve_endpoint_variant_config_var(
|
|
'use_fips_endpoint'
|
|
)
|
|
resolved = self.endpoint_resolver.construct_endpoint(
|
|
service_name,
|
|
region_name,
|
|
use_dualstack_endpoint=use_dualstack_endpoint,
|
|
use_fips_endpoint=use_fips_endpoint,
|
|
)
|
|
|
|
# If we can't resolve the region, we'll attempt to get a global
|
|
# endpoint for non-regionalized services (iam, route53, etc)
|
|
if not resolved:
|
|
# TODO: fallback partition_name should be configurable in the
|
|
# future for users to define as needed.
|
|
resolved = self.endpoint_resolver.construct_endpoint(
|
|
service_name,
|
|
region_name,
|
|
partition_name='aws',
|
|
use_dualstack_endpoint=use_dualstack_endpoint,
|
|
use_fips_endpoint=use_fips_endpoint,
|
|
)
|
|
|
|
if resolved:
|
|
return self._create_endpoint(
|
|
resolved, service_name, region_name, endpoint_url, is_secure
|
|
)
|
|
else:
|
|
return self._assume_endpoint(
|
|
service_name, region_name, endpoint_url, is_secure
|
|
)
|
|
|
|
def resolver_uses_builtin_data(self):
|
|
return self.endpoint_resolver.uses_builtin_data
|
|
|
|
def _check_default_region(self, service_name, region_name):
|
|
if region_name is not None:
|
|
return region_name
|
|
# Use the client_config region if no explicit region was provided.
|
|
if self.client_config and self.client_config.region_name is not None:
|
|
return self.client_config.region_name
|
|
|
|
def _create_endpoint(
|
|
self, resolved, service_name, region_name, endpoint_url, is_secure
|
|
):
|
|
region_name, signing_region = self._pick_region_values(
|
|
resolved, region_name, endpoint_url
|
|
)
|
|
if endpoint_url is None:
|
|
endpoint_url = self._make_url(
|
|
resolved.get('hostname'),
|
|
is_secure,
|
|
resolved.get('protocols', []),
|
|
)
|
|
signature_version = self._resolve_signature_version(
|
|
service_name, resolved
|
|
)
|
|
signing_name = self._resolve_signing_name(service_name, resolved)
|
|
return self._create_result(
|
|
service_name=service_name,
|
|
region_name=region_name,
|
|
signing_region=signing_region,
|
|
signing_name=signing_name,
|
|
endpoint_url=endpoint_url,
|
|
metadata=resolved,
|
|
signature_version=signature_version,
|
|
)
|
|
|
|
def _resolve_endpoint_variant_config_var(self, config_var):
|
|
client_config = self.client_config
|
|
config_val = False
|
|
|
|
# Client configuration arg has precedence
|
|
if client_config and getattr(client_config, config_var) is not None:
|
|
return getattr(client_config, config_var)
|
|
elif self.config_store is not None:
|
|
# Check config store
|
|
config_val = self.config_store.get_config_variable(config_var)
|
|
return config_val
|
|
|
|
def _resolve_use_dualstack_endpoint(self, service_name):
|
|
s3_dualstack_mode = self._is_s3_dualstack_mode(service_name)
|
|
if s3_dualstack_mode is not None:
|
|
return s3_dualstack_mode
|
|
return self._resolve_endpoint_variant_config_var(
|
|
'use_dualstack_endpoint'
|
|
)
|
|
|
|
def _is_s3_dualstack_mode(self, service_name):
|
|
if service_name not in self._DUALSTACK_CUSTOMIZED_SERVICES:
|
|
return None
|
|
# TODO: This normalization logic is duplicated from the
|
|
# ClientArgsCreator class. Consolidate everything to
|
|
# ClientArgsCreator. _resolve_signature_version also has similarly
|
|
# duplicated logic.
|
|
client_config = self.client_config
|
|
if (
|
|
client_config is not None
|
|
and client_config.s3 is not None
|
|
and 'use_dualstack_endpoint' in client_config.s3
|
|
):
|
|
# Client config trumps scoped config.
|
|
return client_config.s3['use_dualstack_endpoint']
|
|
if self.scoped_config is not None:
|
|
enabled = self.scoped_config.get('s3', {}).get(
|
|
'use_dualstack_endpoint'
|
|
)
|
|
if enabled in [True, 'True', 'true']:
|
|
return True
|
|
|
|
def _assume_endpoint(
|
|
self, service_name, region_name, endpoint_url, is_secure
|
|
):
|
|
if endpoint_url is None:
|
|
# Expand the default hostname URI template.
|
|
hostname = self.default_endpoint.format(
|
|
service=service_name, region=region_name
|
|
)
|
|
endpoint_url = self._make_url(
|
|
hostname, is_secure, ['http', 'https']
|
|
)
|
|
logger.debug(
|
|
f'Assuming an endpoint for {service_name}, {region_name}: {endpoint_url}'
|
|
)
|
|
# We still want to allow the user to provide an explicit version.
|
|
signature_version = self._resolve_signature_version(
|
|
service_name, {'signatureVersions': ['v4']}
|
|
)
|
|
signing_name = self._resolve_signing_name(service_name, resolved={})
|
|
return self._create_result(
|
|
service_name=service_name,
|
|
region_name=region_name,
|
|
signing_region=region_name,
|
|
signing_name=signing_name,
|
|
signature_version=signature_version,
|
|
endpoint_url=endpoint_url,
|
|
metadata={},
|
|
)
|
|
|
|
def _create_result(
|
|
self,
|
|
service_name,
|
|
region_name,
|
|
signing_region,
|
|
signing_name,
|
|
endpoint_url,
|
|
signature_version,
|
|
metadata,
|
|
):
|
|
return {
|
|
'service_name': service_name,
|
|
'region_name': region_name,
|
|
'signing_region': signing_region,
|
|
'signing_name': signing_name,
|
|
'endpoint_url': endpoint_url,
|
|
'signature_version': signature_version,
|
|
'metadata': metadata,
|
|
}
|
|
|
|
def _make_url(self, hostname, is_secure, supported_protocols):
|
|
if is_secure and 'https' in supported_protocols:
|
|
scheme = 'https'
|
|
else:
|
|
scheme = 'http'
|
|
return f'{scheme}://{hostname}'
|
|
|
|
def _resolve_signing_name(self, service_name, resolved):
|
|
# CredentialScope overrides everything else.
|
|
if (
|
|
'credentialScope' in resolved
|
|
and 'service' in resolved['credentialScope']
|
|
):
|
|
return resolved['credentialScope']['service']
|
|
# Use the signingName from the model if present.
|
|
if self.service_signing_name:
|
|
return self.service_signing_name
|
|
# Just assume is the same as the service name.
|
|
return service_name
|
|
|
|
def _pick_region_values(self, resolved, region_name, endpoint_url):
|
|
signing_region = region_name
|
|
if endpoint_url is None:
|
|
# Do not use the region name or signing name from the resolved
|
|
# endpoint if the user explicitly provides an endpoint_url. This
|
|
# would happen if we resolve to an endpoint where the service has
|
|
# a "defaults" section that overrides all endpoint with a single
|
|
# hostname and credentialScope. This has been the case historically
|
|
# for how STS has worked. The only way to resolve an STS endpoint
|
|
# was to provide a region_name and an endpoint_url. In that case,
|
|
# we would still resolve an endpoint, but we would not use the
|
|
# resolved endpointName or signingRegion because we want to allow
|
|
# custom endpoints.
|
|
region_name = resolved['endpointName']
|
|
signing_region = region_name
|
|
if (
|
|
'credentialScope' in resolved
|
|
and 'region' in resolved['credentialScope']
|
|
):
|
|
signing_region = resolved['credentialScope']['region']
|
|
return region_name, signing_region
|
|
|
|
def _resolve_signature_version(self, service_name, resolved):
|
|
configured_version = _get_configured_signature_version(
|
|
service_name, self.client_config, self.scoped_config
|
|
)
|
|
if configured_version is not None:
|
|
return configured_version
|
|
|
|
potential_versions = resolved.get('signatureVersions', [])
|
|
if (
|
|
self.service_signature_version is not None
|
|
and self.service_signature_version
|
|
not in _LEGACY_SIGNATURE_VERSIONS
|
|
):
|
|
# Prefer the service model as most specific
|
|
# source of truth for new signature versions.
|
|
potential_versions = [self.service_signature_version]
|
|
|
|
# Pick a signature version from the endpoint metadata if present.
|
|
if 'signatureVersions' in resolved:
|
|
if service_name == 's3':
|
|
return 's3v4'
|
|
if 'v4' in potential_versions:
|
|
return 'v4'
|
|
# Now just iterate over the signature versions in order until we
|
|
# find the first one that is known to Botocore.
|
|
for known in potential_versions:
|
|
if known in AUTH_TYPE_MAPS:
|
|
return known
|
|
raise UnknownSignatureVersionError(
|
|
signature_version=potential_versions
|
|
)
|
|
|
|
|
|
class BaseClient:
|
|
# This is actually reassigned with the py->op_name mapping
|
|
# when the client creator creates the subclass. This value is used
|
|
# because calls such as client.get_paginator('list_objects') use the
|
|
# snake_case name, but we need to know the ListObjects form.
|
|
# xform_name() does the ListObjects->list_objects conversion, but
|
|
# we need the reverse mapping here.
|
|
_PY_TO_OP_NAME = {}
|
|
|
|
def __init__(
|
|
self,
|
|
serializer,
|
|
endpoint,
|
|
response_parser,
|
|
event_emitter,
|
|
request_signer,
|
|
service_model,
|
|
loader,
|
|
client_config,
|
|
partition,
|
|
exceptions_factory,
|
|
endpoint_ruleset_resolver=None,
|
|
user_agent_creator=None,
|
|
):
|
|
self._serializer = serializer
|
|
self._endpoint = endpoint
|
|
self._ruleset_resolver = endpoint_ruleset_resolver
|
|
self._response_parser = response_parser
|
|
self._request_signer = request_signer
|
|
self._cache = {}
|
|
self._loader = loader
|
|
self._client_config = client_config
|
|
self.meta = ClientMeta(
|
|
event_emitter,
|
|
self._client_config,
|
|
endpoint.host,
|
|
service_model,
|
|
self._PY_TO_OP_NAME,
|
|
partition,
|
|
)
|
|
self._exceptions_factory = exceptions_factory
|
|
self._exceptions = None
|
|
self._user_agent_creator = user_agent_creator
|
|
if self._user_agent_creator is None:
|
|
self._user_agent_creator = (
|
|
UserAgentString.from_environment().with_client_config(
|
|
self._client_config
|
|
)
|
|
)
|
|
self._register_handlers()
|
|
|
|
def __getattr__(self, item):
|
|
service_id = self._service_model.service_id.hyphenize()
|
|
event_name = f'getattr.{service_id}.{item}'
|
|
|
|
handler, event_response = self.meta.events.emit_until_response(
|
|
event_name, client=self
|
|
)
|
|
|
|
if event_response is not None:
|
|
return event_response
|
|
|
|
raise AttributeError(
|
|
f"'{self.__class__.__name__}' object has no attribute '{item}'"
|
|
)
|
|
|
|
def close(self):
|
|
"""Closes underlying endpoint connections."""
|
|
self._endpoint.close()
|
|
|
|
def _register_handlers(self):
|
|
# Register the handler required to sign requests.
|
|
service_id = self.meta.service_model.service_id.hyphenize()
|
|
self.meta.events.register(
|
|
f"request-created.{service_id}", self._request_signer.handler
|
|
)
|
|
|
|
@property
|
|
def _service_model(self):
|
|
return self.meta.service_model
|
|
|
|
def _make_api_call(self, operation_name, api_params):
|
|
operation_model = self._service_model.operation_model(operation_name)
|
|
service_name = self._service_model.service_name
|
|
history_recorder.record(
|
|
'API_CALL',
|
|
{
|
|
'service': service_name,
|
|
'operation': operation_name,
|
|
'params': api_params,
|
|
},
|
|
)
|
|
if operation_model.deprecated:
|
|
logger.debug(
|
|
'Warning: %s.%s() is deprecated', service_name, operation_name
|
|
)
|
|
request_context = {
|
|
'client_region': self.meta.region_name,
|
|
'client_config': self.meta.config,
|
|
'has_streaming_input': operation_model.has_streaming_input,
|
|
'auth_type': operation_model.auth_type,
|
|
}
|
|
api_params = self._emit_api_params(
|
|
api_params=api_params,
|
|
operation_model=operation_model,
|
|
context=request_context,
|
|
)
|
|
(
|
|
endpoint_url,
|
|
additional_headers,
|
|
properties,
|
|
) = self._resolve_endpoint_ruleset(
|
|
operation_model, api_params, request_context
|
|
)
|
|
if properties:
|
|
# Pass arbitrary endpoint info with the Request
|
|
# for use during construction.
|
|
request_context['endpoint_properties'] = properties
|
|
request_dict = self._convert_to_request_dict(
|
|
api_params=api_params,
|
|
operation_model=operation_model,
|
|
endpoint_url=endpoint_url,
|
|
context=request_context,
|
|
headers=additional_headers,
|
|
)
|
|
resolve_checksum_context(request_dict, operation_model, api_params)
|
|
|
|
service_id = self._service_model.service_id.hyphenize()
|
|
handler, event_response = self.meta.events.emit_until_response(
|
|
'before-call.{service_id}.{operation_name}'.format(
|
|
service_id=service_id, operation_name=operation_name
|
|
),
|
|
model=operation_model,
|
|
params=request_dict,
|
|
request_signer=self._request_signer,
|
|
context=request_context,
|
|
)
|
|
|
|
if event_response is not None:
|
|
http, parsed_response = event_response
|
|
else:
|
|
maybe_compress_request(
|
|
self.meta.config, request_dict, operation_model
|
|
)
|
|
apply_request_checksum(request_dict)
|
|
http, parsed_response = self._make_request(
|
|
operation_model, request_dict, request_context
|
|
)
|
|
|
|
self.meta.events.emit(
|
|
'after-call.{service_id}.{operation_name}'.format(
|
|
service_id=service_id, operation_name=operation_name
|
|
),
|
|
http_response=http,
|
|
parsed=parsed_response,
|
|
model=operation_model,
|
|
context=request_context,
|
|
)
|
|
|
|
if http.status_code >= 300:
|
|
error_info = parsed_response.get("Error", {})
|
|
error_code = error_info.get("QueryErrorCode") or error_info.get(
|
|
"Code"
|
|
)
|
|
error_class = self.exceptions.from_code(error_code)
|
|
raise error_class(parsed_response, operation_name)
|
|
else:
|
|
return parsed_response
|
|
|
|
def _make_request(self, operation_model, request_dict, request_context):
|
|
try:
|
|
return self._endpoint.make_request(operation_model, request_dict)
|
|
except Exception as e:
|
|
self.meta.events.emit(
|
|
'after-call-error.{service_id}.{operation_name}'.format(
|
|
service_id=self._service_model.service_id.hyphenize(),
|
|
operation_name=operation_model.name,
|
|
),
|
|
exception=e,
|
|
context=request_context,
|
|
)
|
|
raise
|
|
|
|
def _convert_to_request_dict(
|
|
self,
|
|
api_params,
|
|
operation_model,
|
|
endpoint_url,
|
|
context=None,
|
|
headers=None,
|
|
set_user_agent_header=True,
|
|
):
|
|
request_dict = self._serializer.serialize_to_request(
|
|
api_params, operation_model
|
|
)
|
|
if not self._client_config.inject_host_prefix:
|
|
request_dict.pop('host_prefix', None)
|
|
if headers is not None:
|
|
request_dict['headers'].update(headers)
|
|
if set_user_agent_header:
|
|
user_agent = self._user_agent_creator.to_string()
|
|
else:
|
|
user_agent = None
|
|
prepare_request_dict(
|
|
request_dict,
|
|
endpoint_url=endpoint_url,
|
|
user_agent=user_agent,
|
|
context=context,
|
|
)
|
|
return request_dict
|
|
|
|
def _emit_api_params(self, api_params, operation_model, context):
|
|
# Given the API params provided by the user and the operation_model
|
|
# we can serialize the request to a request_dict.
|
|
operation_name = operation_model.name
|
|
|
|
# Emit an event that allows users to modify the parameters at the
|
|
# beginning of the method. It allows handlers to modify existing
|
|
# parameters or return a new set of parameters to use.
|
|
service_id = self._service_model.service_id.hyphenize()
|
|
responses = self.meta.events.emit(
|
|
f'provide-client-params.{service_id}.{operation_name}',
|
|
params=api_params,
|
|
model=operation_model,
|
|
context=context,
|
|
)
|
|
api_params = first_non_none_response(responses, default=api_params)
|
|
|
|
self.meta.events.emit(
|
|
f'before-parameter-build.{service_id}.{operation_name}',
|
|
params=api_params,
|
|
model=operation_model,
|
|
context=context,
|
|
)
|
|
return api_params
|
|
|
|
def _resolve_endpoint_ruleset(
|
|
self,
|
|
operation_model,
|
|
params,
|
|
request_context,
|
|
ignore_signing_region=False,
|
|
):
|
|
"""Returns endpoint URL and list of additional headers returned from
|
|
EndpointRulesetResolver for the given operation and params. If the
|
|
ruleset resolver is not available, for example because the service has
|
|
no endpoints ruleset file, the legacy endpoint resolver's value is
|
|
returned.
|
|
|
|
Use ignore_signing_region for generating presigned URLs or any other
|
|
situation where the signing region information from the ruleset
|
|
resolver should be ignored.
|
|
|
|
Returns tuple of URL and headers dictionary. Additionally, the
|
|
request_context dict is modified in place with any signing information
|
|
returned from the ruleset resolver.
|
|
"""
|
|
if self._ruleset_resolver is None:
|
|
endpoint_url = self.meta.endpoint_url
|
|
additional_headers = {}
|
|
endpoint_properties = {}
|
|
else:
|
|
endpoint_info = self._ruleset_resolver.construct_endpoint(
|
|
operation_model=operation_model,
|
|
call_args=params,
|
|
request_context=request_context,
|
|
)
|
|
endpoint_url = endpoint_info.url
|
|
additional_headers = endpoint_info.headers
|
|
endpoint_properties = endpoint_info.properties
|
|
# If authSchemes is present, overwrite default auth type and
|
|
# signing context derived from service model.
|
|
auth_schemes = endpoint_info.properties.get('authSchemes')
|
|
if auth_schemes is not None:
|
|
auth_info = self._ruleset_resolver.auth_schemes_to_signing_ctx(
|
|
auth_schemes
|
|
)
|
|
auth_type, signing_context = auth_info
|
|
request_context['auth_type'] = auth_type
|
|
if 'region' in signing_context and ignore_signing_region:
|
|
del signing_context['region']
|
|
if 'signing' in request_context:
|
|
request_context['signing'].update(signing_context)
|
|
else:
|
|
request_context['signing'] = signing_context
|
|
|
|
return endpoint_url, additional_headers, endpoint_properties
|
|
|
|
def get_paginator(self, operation_name):
|
|
"""Create a paginator for an operation.
|
|
|
|
:type operation_name: string
|
|
:param operation_name: The operation name. This is the same name
|
|
as the method name on the client. For example, if the
|
|
method name is ``create_foo``, and you'd normally invoke the
|
|
operation as ``client.create_foo(**kwargs)``, if the
|
|
``create_foo`` operation can be paginated, you can use the
|
|
call ``client.get_paginator("create_foo")``.
|
|
|
|
:raise OperationNotPageableError: Raised if the operation is not
|
|
pageable. You can use the ``client.can_paginate`` method to
|
|
check if an operation is pageable.
|
|
|
|
:rtype: ``botocore.paginate.Paginator``
|
|
:return: A paginator object.
|
|
|
|
"""
|
|
if not self.can_paginate(operation_name):
|
|
raise OperationNotPageableError(operation_name=operation_name)
|
|
else:
|
|
actual_operation_name = self._PY_TO_OP_NAME[operation_name]
|
|
|
|
# Create a new paginate method that will serve as a proxy to
|
|
# the underlying Paginator.paginate method. This is needed to
|
|
# attach a docstring to the method.
|
|
def paginate(self, **kwargs):
|
|
return Paginator.paginate(self, **kwargs)
|
|
|
|
paginator_config = self._cache['page_config'][
|
|
actual_operation_name
|
|
]
|
|
# Add the docstring for the paginate method.
|
|
paginate.__doc__ = PaginatorDocstring(
|
|
paginator_name=actual_operation_name,
|
|
event_emitter=self.meta.events,
|
|
service_model=self.meta.service_model,
|
|
paginator_config=paginator_config,
|
|
include_signature=False,
|
|
)
|
|
|
|
# Rename the paginator class based on the type of paginator.
|
|
service_module_name = get_service_module_name(
|
|
self.meta.service_model
|
|
)
|
|
paginator_class_name = (
|
|
f"{service_module_name}.Paginator.{actual_operation_name}"
|
|
)
|
|
|
|
# Create the new paginator class
|
|
documented_paginator_cls = type(
|
|
paginator_class_name, (Paginator,), {'paginate': paginate}
|
|
)
|
|
|
|
operation_model = self._service_model.operation_model(
|
|
actual_operation_name
|
|
)
|
|
paginator = documented_paginator_cls(
|
|
getattr(self, operation_name),
|
|
paginator_config,
|
|
operation_model,
|
|
)
|
|
return paginator
|
|
|
|
def can_paginate(self, operation_name):
|
|
"""Check if an operation can be paginated.
|
|
|
|
:type operation_name: string
|
|
:param operation_name: The operation name. This is the same name
|
|
as the method name on the client. For example, if the
|
|
method name is ``create_foo``, and you'd normally invoke the
|
|
operation as ``client.create_foo(**kwargs)``, if the
|
|
``create_foo`` operation can be paginated, you can use the
|
|
call ``client.get_paginator("create_foo")``.
|
|
|
|
:return: ``True`` if the operation can be paginated,
|
|
``False`` otherwise.
|
|
|
|
"""
|
|
if 'page_config' not in self._cache:
|
|
try:
|
|
page_config = self._loader.load_service_model(
|
|
self._service_model.service_name,
|
|
'paginators-1',
|
|
self._service_model.api_version,
|
|
)['pagination']
|
|
self._cache['page_config'] = page_config
|
|
except DataNotFoundError:
|
|
self._cache['page_config'] = {}
|
|
actual_operation_name = self._PY_TO_OP_NAME[operation_name]
|
|
return actual_operation_name in self._cache['page_config']
|
|
|
|
def _get_waiter_config(self):
|
|
if 'waiter_config' not in self._cache:
|
|
try:
|
|
waiter_config = self._loader.load_service_model(
|
|
self._service_model.service_name,
|
|
'waiters-2',
|
|
self._service_model.api_version,
|
|
)
|
|
self._cache['waiter_config'] = waiter_config
|
|
except DataNotFoundError:
|
|
self._cache['waiter_config'] = {}
|
|
return self._cache['waiter_config']
|
|
|
|
def get_waiter(self, waiter_name):
|
|
"""Returns an object that can wait for some condition.
|
|
|
|
:type waiter_name: str
|
|
:param waiter_name: The name of the waiter to get. See the waiters
|
|
section of the service docs for a list of available waiters.
|
|
|
|
:returns: The specified waiter object.
|
|
:rtype: ``botocore.waiter.Waiter``
|
|
"""
|
|
config = self._get_waiter_config()
|
|
if not config:
|
|
raise ValueError("Waiter does not exist: %s" % waiter_name)
|
|
model = waiter.WaiterModel(config)
|
|
mapping = {}
|
|
for name in model.waiter_names:
|
|
mapping[xform_name(name)] = name
|
|
if waiter_name not in mapping:
|
|
raise ValueError("Waiter does not exist: %s" % waiter_name)
|
|
|
|
return waiter.create_waiter_with_client(
|
|
mapping[waiter_name], model, self
|
|
)
|
|
|
|
@CachedProperty
|
|
def waiter_names(self):
|
|
"""Returns a list of all available waiters."""
|
|
config = self._get_waiter_config()
|
|
if not config:
|
|
return []
|
|
model = waiter.WaiterModel(config)
|
|
# Waiter configs is a dict, we just want the waiter names
|
|
# which are the keys in the dict.
|
|
return [xform_name(name) for name in model.waiter_names]
|
|
|
|
@property
|
|
def exceptions(self):
|
|
if self._exceptions is None:
|
|
self._exceptions = self._load_exceptions()
|
|
return self._exceptions
|
|
|
|
def _load_exceptions(self):
|
|
return self._exceptions_factory.create_client_exceptions(
|
|
self._service_model
|
|
)
|
|
|
|
def _get_credentials(self):
|
|
"""
|
|
This private interface is subject to abrupt breaking changes, including
|
|
removal, in any botocore release.
|
|
"""
|
|
return self._request_signer._credentials
|
|
|
|
|
|
class ClientMeta:
|
|
"""Holds additional client methods.
|
|
|
|
This class holds additional information for clients. It exists for
|
|
two reasons:
|
|
|
|
* To give advanced functionality to clients
|
|
* To namespace additional client attributes from the operation
|
|
names which are mapped to methods at runtime. This avoids
|
|
ever running into collisions with operation names.
|
|
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
events,
|
|
client_config,
|
|
endpoint_url,
|
|
service_model,
|
|
method_to_api_mapping,
|
|
partition,
|
|
):
|
|
self.events = events
|
|
self._client_config = client_config
|
|
self._endpoint_url = endpoint_url
|
|
self._service_model = service_model
|
|
self._method_to_api_mapping = method_to_api_mapping
|
|
self._partition = partition
|
|
|
|
@property
|
|
def service_model(self):
|
|
return self._service_model
|
|
|
|
@property
|
|
def region_name(self):
|
|
return self._client_config.region_name
|
|
|
|
@property
|
|
def endpoint_url(self):
|
|
return self._endpoint_url
|
|
|
|
@property
|
|
def config(self):
|
|
return self._client_config
|
|
|
|
@property
|
|
def method_to_api_mapping(self):
|
|
return self._method_to_api_mapping
|
|
|
|
@property
|
|
def partition(self):
|
|
return self._partition
|
|
|
|
|
|
def _get_configured_signature_version(
|
|
service_name, client_config, scoped_config
|
|
):
|
|
"""
|
|
Gets the manually configured signature version.
|
|
|
|
:returns: the customer configured signature version, or None if no
|
|
signature version was configured.
|
|
"""
|
|
# Client config overrides everything.
|
|
if client_config and client_config.signature_version is not None:
|
|
return client_config.signature_version
|
|
|
|
# Scoped config overrides picking from the endpoint metadata.
|
|
if scoped_config is not None:
|
|
# A given service may have service specific configuration in the
|
|
# config file, so we need to check there as well.
|
|
service_config = scoped_config.get(service_name)
|
|
if service_config is not None and isinstance(service_config, dict):
|
|
version = service_config.get('signature_version')
|
|
if version:
|
|
logger.debug(
|
|
"Switching signature version for service %s "
|
|
"to version %s based on config file override.",
|
|
service_name,
|
|
version,
|
|
)
|
|
return version
|
|
return None
|