# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import pytest from botocore.session import get_session SERVICE_RENAMES = { # Actual service name we use -> Allowed computed service name. 'alexaforbusiness': 'alexa-for-business', 'apigateway': 'api-gateway', 'application-autoscaling': 'application-auto-scaling', 'appmesh': 'app-mesh', 'autoscaling': 'auto-scaling', 'autoscaling-plans': 'auto-scaling-plans', 'ce': 'cost-explorer', 'cloudhsmv2': 'cloudhsm-v2', 'cloudsearchdomain': 'cloudsearch-domain', 'cognito-idp': 'cognito-identity-provider', 'config': 'config-service', 'cur': 'cost-and-usage-report-service', 'datapipeline': 'data-pipeline', 'directconnect': 'direct-connect', 'devicefarm': 'device-farm', 'discovery': 'application-discovery-service', 'dms': 'database-migration-service', 'ds': 'directory-service', 'dynamodbstreams': 'dynamodb-streams', 'elasticbeanstalk': 'elastic-beanstalk', 'elastictranscoder': 'elastic-transcoder', 'elb': 'elastic-load-balancing', 'elbv2': 'elastic-load-balancing-v2', 'es': 'elasticsearch-service', 'events': 'eventbridge', 'globalaccelerator': 'global-accelerator', 'iot-data': 'iot-data-plane', 'iot-jobs-data': 'iot-jobs-data-plane', 'iot1click-devices': 'iot-1click-devices-service', 'iot1click-projects': 'iot-1click-projects', 'iotevents-data': 'iot-events-data', 'iotevents': 'iot-events', 'iotwireless': 'iot-wireless', 'kinesisanalytics': 'kinesis-analytics', 'kinesisanalyticsv2': 'kinesis-analytics-v2', 'kinesisvideo': 'kinesis-video', 'lex-models': 'lex-model-building-service', 'lexv2-models': 'lex-models-v2', 'lex-runtime': 'lex-runtime-service', 'lexv2-runtime': 'lex-runtime-v2', 'logs': 'cloudwatch-logs', 'machinelearning': 'machine-learning', 'marketplacecommerceanalytics': 'marketplace-commerce-analytics', 'marketplace-entitlement': 'marketplace-entitlement-service', 'meteringmarketplace': 'marketplace-metering', 'mgh': 'migration-hub', 'sms-voice': 'pinpoint-sms-voice', 'resourcegroupstaggingapi': 'resource-groups-tagging-api', 'route53': 'route-53', 'route53domains': 'route-53-domains', 's3control': 's3-control', 'sdb': 'simpledb', 'secretsmanager': 'secrets-manager', 'serverlessrepo': 'serverlessapplicationrepository', 'servicecatalog': 'service-catalog', 'servicecatalog-appregistry': 'service-catalog-appregistry', 'stepfunctions': 'sfn', 'storagegateway': 'storage-gateway', } ENDPOINT_PREFIX_OVERRIDE = { # entry in endpoints.json -> actual endpoint prefix. # The autoscaling-* services actually send requests to the # autoscaling service, but they're exposed as separate clients # in botocore. 'autoscaling-plans': 'autoscaling', 'application-autoscaling': 'autoscaling', # For neptune, we send requests to the RDS endpoint. 'neptune': 'rds', 'docdb': 'rds', # iotevents data endpoints.json and service-2.json don't line up. 'ioteventsdata': 'data.iotevents', 'iotsecuredtunneling': 'api.tunneling.iot', 'iotwireless': 'api.iotwireless', 'data.iot': 'data-ats.iot', } NOT_SUPPORTED_IN_SDK = [ 'mobileanalytics', 'transcribestreaming', ] SESSION = get_session() LOADER = SESSION.get_component('data_loader') AVAILABLE_SERVICES = LOADER.list_available_services('service-2') def _known_endpoint_prefixes(): # The entries in endpoints.json are keyed off of the endpoint # prefix. We don't directly have that data, so we have to load # every service model and look up its endpoint prefix in its # ``metadata`` section. return { SESSION.get_service_model(service_name).endpoint_prefix for service_name in AVAILABLE_SERVICES } def _computed_endpoint_prefixes(): # This verifies client names match up with data from the endpoints.json # file. We want to verify that every entry in the endpoints.json # file corresponds to a client we can construct via # session.create_client(...). # So first we get a list of all the service names in the endpoints # file. endpoints = LOADER.load_data('endpoints') # A service can be in multiple partitions so we're using # a set here to remove dupes. services_in_endpoints_file = set() for partition in endpoints['partitions']: for service in partition['services']: # There are some services we don't support in the SDK # so we don't need to add them to the list of services # we need to check. if service not in NOT_SUPPORTED_IN_SDK: services_in_endpoints_file.add(service) # Now we go through every known endpoint prefix in the endpoints.json # file and ensure it maps to an endpoint prefix we've seen # in a service model. endpoint_prefixes = [] for endpoint_prefix in services_in_endpoints_file: # Check for an override where we know that an entry # in the endpoints.json actually maps to a different endpoint # prefix. endpoint_prefix = ENDPOINT_PREFIX_OVERRIDE.get( endpoint_prefix, endpoint_prefix ) endpoint_prefixes.append(endpoint_prefix) return sorted(endpoint_prefixes) KNOWN_ENDPOINT_PREFIXES = _known_endpoint_prefixes() COMPUTED_ENDPOINT_PREFIXES = _computed_endpoint_prefixes() @pytest.mark.parametrize("endpoint_prefix", COMPUTED_ENDPOINT_PREFIXES) def test_endpoint_matches_service(endpoint_prefix): # We need to cross check all computed endpoints against our # known values in endpoints.json, to ensure everything lines # up correctly. assert endpoint_prefix in KNOWN_ENDPOINT_PREFIXES @pytest.mark.parametrize("service_name", AVAILABLE_SERVICES) def test_service_name_matches_endpoint_prefix(service_name): """Generates tests for each service to verify that the computed service named based on the service id matches the service name used to create a client (i.e the directory name in botocore/data) unless there is an explicit exception. """ service_model = SESSION.get_service_model(service_name) computed_name = service_model.service_id.replace(' ', '-').lower() # Handle known exceptions where we have renamed the service directory # for one reason or another. actual_service_name = SERVICE_RENAMES.get(service_name, service_name) err_msg = ( f"Actual service name `{actual_service_name}` does not match " f"expected service name we computed: `{computed_name}`" ) assert computed_name == actual_service_name, err_msg _S3_ALLOWED_PSEUDO_FIPS_REGIONS = [ 'fips-accesspoint-ca-central-1', 'fips-accesspoint-us-east-1', 'fips-accesspoint-us-east-2', 'fips-accesspoint-us-west-1', 'fips-accesspoint-us-west-2', 'fips-accesspoint-us-gov-east-1', 'fips-accesspoint-us-gov-west-1', 'fips-us-gov-west-1', 'fips-us-gov-east-1', 'fips-ca-central-1', 'fips-us-east-1', 'fips-us-east-2', 'fips-us-west-1', 'fips-us-west-2', ] def _s3_region_names(): endpoints = LOADER.load_data('endpoints') for partition in endpoints['partitions']: s3_service = partition['services'].get('s3', {}) for region_name in s3_service['endpoints']: yield region_name.lower() @pytest.mark.parametrize("region_name", _s3_region_names()) def test_no_s3_fips_regions(region_name): # Fail if additional FIPS pseudo-regions are added to S3. # This may be removed once proper support is implemented for FIPS in S3. if region_name in _S3_ALLOWED_PSEUDO_FIPS_REGIONS: return err_msg = ( 'New S3 FIPS pseudo-region added: "{region_name}". ' 'FIPS has compliancy requirements that may not be met in all cases ' 'for S3 clients due to the custom endpoint resolution and ' 'construction logic.' ) assert 'fips' not in region_name, err_msg