python-botocore/tests/functional/test_endpoints.py
2022-05-25 15:10:07 -07:00

228 lines
8.4 KiB
Python

# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from botocore.session import get_session
SERVICE_RENAMES = {
# Actual service name we use -> Allowed computed service name.
'alexaforbusiness': 'alexa-for-business',
'apigateway': 'api-gateway',
'application-autoscaling': 'application-auto-scaling',
'appmesh': 'app-mesh',
'autoscaling': 'auto-scaling',
'autoscaling-plans': 'auto-scaling-plans',
'ce': 'cost-explorer',
'cloudhsmv2': 'cloudhsm-v2',
'cloudsearchdomain': 'cloudsearch-domain',
'cognito-idp': 'cognito-identity-provider',
'config': 'config-service',
'cur': 'cost-and-usage-report-service',
'datapipeline': 'data-pipeline',
'directconnect': 'direct-connect',
'devicefarm': 'device-farm',
'discovery': 'application-discovery-service',
'dms': 'database-migration-service',
'ds': 'directory-service',
'dynamodbstreams': 'dynamodb-streams',
'elasticbeanstalk': 'elastic-beanstalk',
'elastictranscoder': 'elastic-transcoder',
'elb': 'elastic-load-balancing',
'elbv2': 'elastic-load-balancing-v2',
'es': 'elasticsearch-service',
'events': 'eventbridge',
'globalaccelerator': 'global-accelerator',
'iot-data': 'iot-data-plane',
'iot-jobs-data': 'iot-jobs-data-plane',
'iot1click-devices': 'iot-1click-devices-service',
'iot1click-projects': 'iot-1click-projects',
'iotevents-data': 'iot-events-data',
'iotevents': 'iot-events',
'iotwireless': 'iot-wireless',
'kinesisanalytics': 'kinesis-analytics',
'kinesisanalyticsv2': 'kinesis-analytics-v2',
'kinesisvideo': 'kinesis-video',
'lex-models': 'lex-model-building-service',
'lexv2-models': 'lex-models-v2',
'lex-runtime': 'lex-runtime-service',
'lexv2-runtime': 'lex-runtime-v2',
'logs': 'cloudwatch-logs',
'machinelearning': 'machine-learning',
'marketplacecommerceanalytics': 'marketplace-commerce-analytics',
'marketplace-entitlement': 'marketplace-entitlement-service',
'meteringmarketplace': 'marketplace-metering',
'mgh': 'migration-hub',
'sms-voice': 'pinpoint-sms-voice',
'resourcegroupstaggingapi': 'resource-groups-tagging-api',
'route53': 'route-53',
'route53domains': 'route-53-domains',
's3control': 's3-control',
'sdb': 'simpledb',
'secretsmanager': 'secrets-manager',
'serverlessrepo': 'serverlessapplicationrepository',
'servicecatalog': 'service-catalog',
'servicecatalog-appregistry': 'service-catalog-appregistry',
'stepfunctions': 'sfn',
'storagegateway': 'storage-gateway',
}
ENDPOINT_PREFIX_OVERRIDE = {
# entry in endpoints.json -> actual endpoint prefix.
# The autoscaling-* services actually send requests to the
# autoscaling service, but they're exposed as separate clients
# in botocore.
'autoscaling-plans': 'autoscaling',
'application-autoscaling': 'autoscaling',
# For neptune, we send requests to the RDS endpoint.
'neptune': 'rds',
'docdb': 'rds',
# iotevents data endpoints.json and service-2.json don't line up.
'ioteventsdata': 'data.iotevents',
'iotsecuredtunneling': 'api.tunneling.iot',
'iotwireless': 'api.iotwireless',
'data.iot': 'data-ats.iot',
}
NOT_SUPPORTED_IN_SDK = [
'mobileanalytics',
'transcribestreaming',
]
SESSION = get_session()
LOADER = SESSION.get_component('data_loader')
AVAILABLE_SERVICES = LOADER.list_available_services('service-2')
def _known_endpoint_prefixes():
# The entries in endpoints.json are keyed off of the endpoint
# prefix. We don't directly have that data, so we have to load
# every service model and look up its endpoint prefix in its
# ``metadata`` section.
return {
SESSION.get_service_model(service_name).endpoint_prefix
for service_name in AVAILABLE_SERVICES
}
def _computed_endpoint_prefixes():
# This verifies client names match up with data from the endpoints.json
# file. We want to verify that every entry in the endpoints.json
# file corresponds to a client we can construct via
# session.create_client(...).
# So first we get a list of all the service names in the endpoints
# file.
endpoints = LOADER.load_data('endpoints')
# A service can be in multiple partitions so we're using
# a set here to remove dupes.
services_in_endpoints_file = set()
for partition in endpoints['partitions']:
for service in partition['services']:
# There are some services we don't support in the SDK
# so we don't need to add them to the list of services
# we need to check.
if service not in NOT_SUPPORTED_IN_SDK:
services_in_endpoints_file.add(service)
# Now we go through every known endpoint prefix in the endpoints.json
# file and ensure it maps to an endpoint prefix we've seen
# in a service model.
endpoint_prefixes = []
for endpoint_prefix in services_in_endpoints_file:
# Check for an override where we know that an entry
# in the endpoints.json actually maps to a different endpoint
# prefix.
endpoint_prefix = ENDPOINT_PREFIX_OVERRIDE.get(
endpoint_prefix, endpoint_prefix
)
endpoint_prefixes.append(endpoint_prefix)
return sorted(endpoint_prefixes)
KNOWN_ENDPOINT_PREFIXES = _known_endpoint_prefixes()
COMPUTED_ENDPOINT_PREFIXES = _computed_endpoint_prefixes()
@pytest.mark.parametrize("endpoint_prefix", COMPUTED_ENDPOINT_PREFIXES)
def test_endpoint_matches_service(endpoint_prefix):
# We need to cross check all computed endpoints against our
# known values in endpoints.json, to ensure everything lines
# up correctly.
assert endpoint_prefix in KNOWN_ENDPOINT_PREFIXES
@pytest.mark.parametrize("service_name", AVAILABLE_SERVICES)
def test_service_name_matches_endpoint_prefix(service_name):
"""Generates tests for each service to verify that the computed service
named based on the service id matches the service name used to
create a client (i.e the directory name in botocore/data)
unless there is an explicit exception.
"""
service_model = SESSION.get_service_model(service_name)
computed_name = service_model.service_id.replace(' ', '-').lower()
# Handle known exceptions where we have renamed the service directory
# for one reason or another.
actual_service_name = SERVICE_RENAMES.get(service_name, service_name)
err_msg = (
f"Actual service name `{actual_service_name}` does not match "
f"expected service name we computed: `{computed_name}`"
)
assert computed_name == actual_service_name, err_msg
_S3_ALLOWED_PSEUDO_FIPS_REGIONS = [
'fips-accesspoint-ca-central-1',
'fips-accesspoint-us-east-1',
'fips-accesspoint-us-east-2',
'fips-accesspoint-us-west-1',
'fips-accesspoint-us-west-2',
'fips-accesspoint-us-gov-east-1',
'fips-accesspoint-us-gov-west-1',
'fips-us-gov-west-1',
'fips-us-gov-east-1',
'fips-ca-central-1',
'fips-us-east-1',
'fips-us-east-2',
'fips-us-west-1',
'fips-us-west-2',
]
def _s3_region_names():
endpoints = LOADER.load_data('endpoints')
for partition in endpoints['partitions']:
s3_service = partition['services'].get('s3', {})
for region_name in s3_service['endpoints']:
yield region_name.lower()
@pytest.mark.parametrize("region_name", _s3_region_names())
def test_no_s3_fips_regions(region_name):
# Fail if additional FIPS pseudo-regions are added to S3.
# This may be removed once proper support is implemented for FIPS in S3.
if region_name in _S3_ALLOWED_PSEUDO_FIPS_REGIONS:
return
err_msg = (
'New S3 FIPS pseudo-region added: "{region_name}". '
'FIPS has compliancy requirements that may not be met in all cases '
'for S3 clients due to the custom endpoint resolution and '
'construction logic.'
)
assert 'fips' not in region_name, err_msg