python-botocore/botocore/translate.py
2015-10-08 11:15:31 -07:00

464 lines
20 KiB
Python

# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Translate the raw json files into python specific descriptions."""
import os
import re
from copy import deepcopy
import jmespath
from botocore.compat import OrderedDict, json
from botocore.utils import set_value_from_jmespath, merge_dicts
from botocore import xform_name
class ModelFiles(object):
"""Container object to hold all the various parsed json files.
Includes:
* The json service description.
* The _retry.json file.
* The <service>.extra.json enhancements file.
* The name of the service.
"""
def __init__(self, model, retry, enhancements, name=''):
self.model = model
self.retry = retry
self.enhancements = enhancements
self.name = name
def load_model_files(args):
model = json.load(open(args.modelfile),
object_pairs_hook=OrderedDict)
retry = json.load(open(args.retry_file),
object_pairs_hook=OrderedDict)
enhancements = _load_enhancements_file(args.enhancements_file)
service_name = os.path.splitext(os.path.basename(args.modelfile))[0]
return ModelFiles(model, retry, enhancements, name=service_name)
def _load_enhancements_file(file_path):
if not os.path.isfile(file_path):
return {}
else:
return json.load(open(file_path),
object_pairs_hook=OrderedDict)
def translate(model):
new_model = deepcopy(model.model)
new_model.update(model.enhancements.get('extra', {}))
try:
del new_model['pagination']
except KeyError:
pass
handle_op_renames(new_model, model.enhancements)
handle_remove_deprecated_params(new_model, model.enhancements)
handle_remove_deprecated_operations(new_model, model.enhancements)
handle_filter_documentation(new_model, model.enhancements)
handle_rename_params(new_model, model.enhancements)
add_pagination_configs(
new_model,
model.enhancements.get('pagination', {}))
add_waiter_configs(
new_model,
model.enhancements.get('waiters', {}))
# Merge in any per operation overrides defined in the .extras.json file.
merge_dicts(new_model['operations'], model.enhancements.get('operations', {}))
add_retry_configs(
new_model, model.retry.get('retry', {}), definitions=model.retry.get('definitions', {}))
return new_model
def handle_op_renames(new_model, enhancements):
# This allows for operations to be renamed. The only
# implemented transformation is removing part of the operation name
# (because that's all we currently need.)
remove = enhancements.get('transformations', {}).get(
'operation-name', {}).get('remove')
if remove is not None:
# We're going to recreate the dictionary because we want to preserve
# the order. This is the only option we have unless we have our own
# custom OrderedDict.
remove_regex = re.compile(remove)
operations = new_model['operations']
new_operation = OrderedDict()
for key in operations:
new_key = remove_regex.sub('', key)
new_operation[new_key] = operations[key]
new_model['operations'] = new_operation
def handle_remove_deprecated_operations(new_model, enhancements):
# This removes any operation whose documentation string contains
# the specified phrase that marks a deprecated parameter.
keyword = enhancements.get('transformations', {}).get(
'remove-deprecated-operations', {}).get('deprecated_keyword')
remove = []
if keyword is not None:
operations = new_model['operations']
for op_name in operations:
operation = operations[op_name]
if operation:
docs = operation['documentation']
if docs and docs.find(keyword) >= 0:
remove.append(op_name)
for op in remove:
del new_model['operations'][op]
def handle_remove_deprecated_params(new_model, enhancements):
# This removes any parameter whose documentation string contains
# the specified phrase that marks a deprecated parameter.
keyword = enhancements.get('transformations', {}).get(
'remove-deprecated-params', {}).get('deprecated_keyword')
if keyword is not None:
operations = new_model['operations']
for op_name in operations:
operation = operations[op_name]
params = operation.get('input', {}).get('members')
if params:
new_params = OrderedDict()
for param_name in params:
param = params[param_name]
docs = param['documentation']
if docs and docs.find(keyword) >= 0:
continue
new_params[param_name] = param
operation['input']['members'] = new_params
def _filter_param_doc(param, replacement, regex):
# Recurse into complex parameters looking for documentation.
doc = param.get('documentation')
if doc:
param['documentation'] = regex.sub(replacement, doc)
if param['type'] == 'structure':
for member_name in param['members']:
member = param['members'][member_name]
_filter_param_doc(member, replacement, regex)
if param['type'] == 'map':
_filter_param_doc(param['keys'], replacement, regex)
_filter_param_doc(param['members'], replacement, regex)
elif param['type'] == 'list':
_filter_param_doc(param['members'], replacement, regex)
def handle_filter_documentation(new_model, enhancements):
#This provides a way to filter undesireable content (e.g. CDATA)
#from documentation strings
filter = enhancements.get('transformations', {}).get(
'filter-documentation', {}).get('filter')
if filter is not None:
filter_regex = re.compile(filter.get('regex', ''), re.DOTALL)
replacement = filter.get('replacement')
operations = new_model['operations']
for op_name in operations:
operation = operations[op_name]
doc = operation.get('documentation')
if doc:
new_doc = filter_regex.sub(replacement, doc)
operation['documentation'] = new_doc
params = operation.get('input', {}).get('members')
if params:
for param_name in params:
param = params[param_name]
_filter_param_doc(param, replacement, filter_regex)
def handle_rename_params(new_model, enhancements):
renames = enhancements.get('transformations', {}).get(
'renames', {})
if not renames:
return
# This is *extremely* specific to botocore's translations, but
# we support a restricted set of argument renames based on a
# jmespath expression.
for expression, new_value in renames.items():
# First we take everything up until the last dot.
parent_expression, key = expression.rsplit('.', 1)
matched = jmespath.search(parent_expression, new_model['operations'])
current = matched[key]
del matched[key]
matched[new_value] = current
def resembles_jmespath_exp(value):
# For now, we'll do a naive check.
if '.' in value:
return True
return False
def add_pagination_configs(new_model, pagination):
# Adding in pagination configs means copying the config to a top level
# 'pagination' key in the new model, and it also means adding the
# pagination config to each individual operation.
# Also, the input_token needs to be transformed to the python specific
# name, so we're adding a py_input_token (e.g. NextToken -> next_token).
if pagination:
new_model['pagination'] = pagination
for name in pagination:
config = pagination[name]
_check_known_pagination_keys(config)
if 'py_input_token' not in config:
input_token = config['input_token']
if isinstance(input_token, list):
py_input_token = []
for token in input_token:
py_input_token.append(xform_name(token))
config['py_input_token'] = py_input_token
else:
config['py_input_token'] = xform_name(input_token)
# result_key must be defined
if 'result_key' not in config:
raise ValueError("Required key 'result_key' is missing from "
"from pagination config: %s" % config)
if name not in new_model['operations']:
raise ValueError("Trying to add pagination config for non "
"existent operation: %s" % name)
operation = new_model['operations'].get(name)
# result_key must match a key in the output.
if not isinstance(config['result_key'], list):
result_keys = [config['result_key']]
else:
result_keys = config['result_key']
if result_keys and not operation['output']:
raise ValueError("Trying to add pagination config for an "
"operation with no output members: %s" % name)
for result_key in result_keys:
if resembles_jmespath_exp(result_key):
continue
if result_key not in operation['output']['members']:
raise ValueError("result_key %r is not an output member: %s" %
(result_key,
operation['output']['members'].keys()))
_check_input_keys_match(config, operation)
if operation is None:
raise ValueError("Tried to add a pagination config for non "
"existent operation '%s'" % name)
operation['pagination'] = config.copy()
def add_waiter_configs(new_model, waiters):
if waiters:
denormalized = denormalize_waiters(waiters)
# Before adding it to the new model, we need to verify the
# final denormalized model.
for value in denormalized.values():
if value['operation'] not in new_model['operations']:
raise ValueError()
new_model['waiters'] = denormalized
def denormalize_waiters(waiters):
# The waiter configuration is normalized to avoid duplication.
# You can inherit defaults, and extend from other definitions.
# We're going to denormalize this so that the implementation for
# consuming waiters is simple.
default = waiters.get('__default__', {})
new_waiters = {}
for key, value in waiters.items():
if key.startswith('__'):
# Keys that start with '__' are considered abstract/internal
# and are only used for inheritance. Because we're going
# to denormalize the configs and perform all the lookups
# during this translation process, the abstract/internal
# configs don't need to make it into the final translated
# config so we can just skip these.
continue
new_waiters[key] = denormalize_single_waiter(value, default, waiters)
return new_waiters
def denormalize_single_waiter(value, default, waiters):
"""Denormalize a single waiter config.
:param value: The dictionary of a single waiter config, e.g.
the ``InstanceRunning`` or ``TableExists`` config. This
is the config we're going to denormalize.
:param default: The ``__default__`` (if any) configuration.
This is needed to resolve the lookup process.
:param waiters: The full configuration of the waiters.
This is needed if we need to look up at parent class that the
current config extends.
:return: The denormalized config.
:rtype: dict
"""
# First we need to resolve all the keys based on the inheritance
# hierarchy. The lookup process is:
# The most bottom/leaf class is ``value``. From there we need
# to look up anything it inherits from (denoted via the ``extends``
# key). We need to perform this process recursively until we hit
# a config that has no ``extends`` key.
# And finally if we haven't found our value yet, we check in the
# ``__default__`` key.
# So the first thing we need to do is build the lookup chain that
# starts with ``value`` and ends with ``__default__``.
lookup_chain = [value]
current = value
while True:
if 'extends' not in current:
break
current = waiters[current.get('extends')]
lookup_chain.append(current)
lookup_chain.append(default)
new_waiter = {}
# Now that we have this lookup chain we can build the entire set
# of values by starting at the most parent class and walking down
# to the children. At each step the child is merged onto the parent's
# config items. This is the desired behavior as a child's values
# overrides its parents. This is what the ``reversed(...)`` call
# is for.
for element in reversed(lookup_chain):
new_waiter.update(element)
# We don't care about 'extends' so we can safely remove that key.
new_waiter.pop('extends', {})
# Now we need to resolve the success/failure values. We
# want to completely remove the acceptor types.
# The logic here is that if there is no success/failure_* variable
# defined, it inherits this value from the matching acceptor_* variable.
new_waiter['success_type'] = new_waiter.get('success_type',
new_waiter.get('acceptor_type'))
new_waiter['success_path'] = new_waiter.get('success_path',
new_waiter.get('acceptor_path'))
new_waiter['success_value'] = new_waiter.get('success_value',
new_waiter.get('acceptor_value'))
new_waiter['failure_type'] = new_waiter.get('failure_type',
new_waiter.get('acceptor_type'))
new_waiter['failure_path'] = new_waiter.get('failure_path',
new_waiter.get('acceptor_path'))
new_waiter['failure_value'] = new_waiter.get('failure_value',
new_waiter.get('acceptor_value'))
# We can remove acceptor_* vars because they're only used for lookups
# and we've already performed this step in the lines above.
new_waiter.pop('acceptor_type', '')
new_waiter.pop('acceptor_path', '')
new_waiter.pop('acceptor_value', '')
# Remove any keys with a None value.
for key in list(new_waiter.keys()):
if new_waiter[key] is None:
del new_waiter[key]
# Check required keys.
for required in ['operation', 'success_type']:
if required not in new_waiter:
raise ValueError('Missing required waiter configuration '
'value "%s": %s' % (required, new_waiter))
if new_waiter.get(required) is None:
raise ValueError('Required waiter configuration '
'value cannot be None "%s": %s'
% (required, new_waiter))
# Finally, success/failure values can be a scalar or a list. We're going
# to just always make them a list.
if 'success_value' in new_waiter and not \
isinstance(new_waiter['success_value'], list):
new_waiter['success_value'] = [new_waiter['success_value']]
if 'failure_value' in new_waiter and not \
isinstance(new_waiter['failure_value'], list):
new_waiter['failure_value'] = [new_waiter['failure_value']]
_transform_waiter(new_waiter)
return new_waiter
def _transform_waiter(new_waiter):
# This transforms the waiters into a format that's slightly
# easier to consume.
if 'success_type' in new_waiter:
success = {'type': new_waiter.pop('success_type')}
if 'success_path' in new_waiter:
success['path'] = new_waiter.pop('success_path')
if 'success_value' in new_waiter:
success['value'] = new_waiter.pop('success_value')
new_waiter['success'] = success
if 'failure_type' in new_waiter:
failure = {'type': new_waiter.pop('failure_type')}
if 'failure_path' in new_waiter:
failure['path'] = new_waiter.pop('failure_path')
if 'failure_value' in new_waiter:
failure['value'] = new_waiter.pop('failure_value')
new_waiter['failure'] = failure
def _check_known_pagination_keys(config):
# Verify that the pagination config only has keys we expect to see.
expected = set(['input_token', 'py_input_token', 'output_token',
'result_key', 'limit_key', 'more_results',
'non_aggregate_keys'])
for key in config:
if key not in expected:
raise ValueError("Unknown key in pagination config: %s" % key)
def _check_input_keys_match(config, operation):
input_tokens = config['input_token']
if not isinstance(input_tokens, list):
input_tokens = [input_tokens]
valid_input_names = operation['input']['members']
for token in input_tokens:
if token not in valid_input_names:
raise ValueError("input_token refers to a non existent "
"input name for operation %s: %s. "
"Must be one of: %s" % (operation['name'], token,
list(valid_input_names)))
if 'limit_key' in config and config['limit_key'] not in valid_input_names:
raise ValueError("limit_key refers to a non existent input name for "
"operation %s: %s. Must be one of: %s" % (
operation['name'], config['limit_key'],
list(valid_input_names)))
def add_retry_configs(new_model, retry_model, definitions):
if not retry_model:
new_model['retry'] = {}
return
# The service specific retry config is keyed off of the endpoint
# prefix as defined in the JSON model.
endpoint_prefix = new_model.get('endpoint_prefix', '')
service_config = retry_model.get(endpoint_prefix, {})
resolve_references(service_config, definitions)
# We want to merge the global defaults with the service specific
# defaults, with the service specific defaults taking precedence.
# So we use the global defaults as the base.
final_retry_config = {'__default__': retry_model.get('__default__', {})}
resolve_references(final_retry_config, definitions)
# The merge the service specific config on top.
merge_dicts(final_retry_config, service_config)
new_model['retry'] = final_retry_config
def resolve_references(config, definitions):
"""Recursively replace $ref keys.
To cut down on duplication, common definitions can be declared
(and passed in via the ``definitions`` attribute) and then
references as {"$ref": "name"}, when this happens the reference
dict is placed with the value from the ``definition`` dict.
This is recursively done.
"""
for key, value in config.items():
if isinstance(value, dict):
if len(value) == 1 and list(value.keys())[0] == '$ref':
# Then we need to resolve this reference.
config[key] = definitions[list(value.values())[0]]
else:
resolve_references(value, definitions)