python-botocore/tests/unit/test_credentials.py
2017-06-27 18:52:19 +09:00

1466 lines
58 KiB
Python

# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
# Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from datetime import datetime, timedelta
import mock
import os
from dateutil.tz import tzlocal, tzutc
from botocore import credentials
from botocore.utils import ContainerMetadataFetcher
from botocore.credentials import EnvProvider, create_assume_role_refresher
import botocore.exceptions
import botocore.session
from tests import unittest, BaseEnvVar, IntegerRefresher
# Passed to session to keep it from finding default config file
TESTENVVARS = {'config_file': (None, 'AWS_CONFIG_FILE', None)}
raw_metadata = {
'foobar': {
'Code': 'Success',
'LastUpdated': '2012-12-03T14:38:21Z',
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'Token': 'foobar',
'Expiration': '2012-12-03T20:48:03Z',
'Type': 'AWS-HMAC'
}
}
post_processed_metadata = {
'role_name': 'foobar',
'access_key': raw_metadata['foobar']['AccessKeyId'],
'secret_key': raw_metadata['foobar']['SecretAccessKey'],
'token': raw_metadata['foobar']['Token'],
'expiry_time': raw_metadata['foobar']['Expiration'],
}
def path(filename):
return os.path.join(os.path.dirname(__file__), 'cfg', filename)
class TestCredentials(BaseEnvVar):
def _ensure_credential_is_normalized_as_unicode(self, access, secret):
c = credentials.Credentials(access, secret)
self.assertTrue(isinstance(c.access_key, type(u'u')))
self.assertTrue(isinstance(c.secret_key, type(u'u')))
def test_detect_nonascii_character(self):
self._ensure_credential_is_normalized_as_unicode(
'foo\xe2\x80\x99', 'bar\xe2\x80\x99')
def test_unicode_input(self):
self._ensure_credential_is_normalized_as_unicode(
u'foo', u'bar')
class TestRefreshableCredentials(TestCredentials):
def setUp(self):
super(TestRefreshableCredentials, self).setUp()
self.refresher = mock.Mock()
self.future_time = datetime.now(tzlocal()) + timedelta(hours=24)
self.expiry_time = \
datetime.now(tzlocal()) - timedelta(minutes=30)
self.metadata = {
'access_key': 'NEW-ACCESS',
'secret_key': 'NEW-SECRET',
'token': 'NEW-TOKEN',
'expiry_time': self.future_time.isoformat(),
'role_name': 'rolename',
}
self.refresher.return_value = self.metadata
self.mock_time = mock.Mock()
self.creds = credentials.RefreshableCredentials(
'ORIGINAL-ACCESS', 'ORIGINAL-SECRET', 'ORIGINAL-TOKEN',
self.expiry_time, self.refresher, 'iam-role',
time_fetcher=self.mock_time
)
def test_refresh_needed(self):
# The expiry time was set for 30 minutes ago, so if we
# say the current time is utcnow(), then we should need
# a refresh.
self.mock_time.return_value = datetime.now(tzlocal())
self.assertTrue(self.creds.refresh_needed())
# We should refresh creds, if we try to access "access_key"
# or any of the cred vars.
self.assertEqual(self.creds.access_key, 'NEW-ACCESS')
self.assertEqual(self.creds.secret_key, 'NEW-SECRET')
self.assertEqual(self.creds.token, 'NEW-TOKEN')
def test_no_refresh_needed(self):
# The expiry time was 30 minutes ago, let's say it's an hour
# ago currently. That would mean we don't need a refresh.
self.mock_time.return_value = (
datetime.now(tzlocal()) - timedelta(minutes=60))
self.assertTrue(not self.creds.refresh_needed())
self.assertEqual(self.creds.access_key, 'ORIGINAL-ACCESS')
self.assertEqual(self.creds.secret_key, 'ORIGINAL-SECRET')
self.assertEqual(self.creds.token, 'ORIGINAL-TOKEN')
def test_get_credentials_set(self):
# We need to return a consistent set of credentials to use during the
# signing process.
self.mock_time.return_value = (
datetime.now(tzlocal()) - timedelta(minutes=60))
self.assertTrue(not self.creds.refresh_needed())
credential_set = self.creds.get_frozen_credentials()
self.assertEqual(credential_set.access_key, 'ORIGINAL-ACCESS')
self.assertEqual(credential_set.secret_key, 'ORIGINAL-SECRET')
self.assertEqual(credential_set.token, 'ORIGINAL-TOKEN')
class TestEnvVar(BaseEnvVar):
def test_envvars_are_found_no_token(self):
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
}
provider = credentials.EnvProvider(environ)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.method, 'env')
def test_envvars_found_with_security_token(self):
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_SECURITY_TOKEN': 'baz',
}
provider = credentials.EnvProvider(environ)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.token, 'baz')
self.assertEqual(creds.method, 'env')
def test_envvars_found_with_session_token(self):
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_SESSION_TOKEN': 'baz',
}
provider = credentials.EnvProvider(environ)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.token, 'baz')
self.assertEqual(creds.method, 'env')
def test_envvars_not_found(self):
provider = credentials.EnvProvider(environ={})
creds = provider.load()
self.assertIsNone(creds)
def test_can_override_env_var_mapping(self):
# We can change the env var provider to
# use our specified env var names.
environ = {
'FOO_ACCESS_KEY': 'foo',
'FOO_SECRET_KEY': 'bar',
'FOO_SESSION_TOKEN': 'baz',
}
mapping = {
'access_key': 'FOO_ACCESS_KEY',
'secret_key': 'FOO_SECRET_KEY',
'token': 'FOO_SESSION_TOKEN',
}
provider = credentials.EnvProvider(
environ, mapping
)
creds = provider.load()
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.token, 'baz')
def test_can_override_partial_env_var_mapping(self):
# Only changing the access key mapping.
# The other 2 use the default values of
# AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN
# use our specified env var names.
environ = {
'FOO_ACCESS_KEY': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_SESSION_TOKEN': 'baz',
}
provider = credentials.EnvProvider(
environ, {'access_key': 'FOO_ACCESS_KEY'}
)
creds = provider.load()
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.token, 'baz')
def test_can_override_expiry_env_var_mapping(self):
expiry_time = datetime.now(tzlocal()) - timedelta(hours=1)
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_SESSION_TOKEN': 'baz',
'FOO_EXPIRY': expiry_time.isoformat(),
}
provider = credentials.EnvProvider(
environ, {'expiry_time': 'FOO_EXPIRY'}
)
creds = provider.load()
# Since the credentials are expired, we'll trigger a refresh whenever
# we try to access them. Since the environment credentials are still
# expired, this will raise an error.
error_message = (
"Credentials were refreshed, but the refreshed credentials are "
"still expired."
)
with self.assertRaisesRegexp(RuntimeError, error_message):
creds.get_frozen_credentials()
def test_partial_creds_is_an_error(self):
# If the user provides an access key, they must also
# provide a secret key. Not doing so will generate an
# error.
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
# Missing the AWS_SECRET_ACCESS_KEY
}
provider = credentials.EnvProvider(environ)
with self.assertRaises(botocore.exceptions.PartialCredentialsError):
provider.load()
def test_missing_access_key_id_raises_error(self):
expiry_time = datetime.now(tzlocal()) - timedelta(hours=1)
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_CREDENTIAL_EXPIRATION': expiry_time.isoformat(),
}
provider = credentials.EnvProvider(environ)
creds = provider.load()
del environ['AWS_ACCESS_KEY_ID']
# Since the credentials are expired, we'll trigger a refresh
# whenever we try to access them. At that refresh time, the relevant
# environment variables are incomplete, so an error will be raised.
with self.assertRaises(botocore.exceptions.PartialCredentialsError):
creds.get_frozen_credentials()
def test_credentials_refresh(self):
# First initialize the credentials with an expired credential set.
expiry_time = datetime.now(tzlocal()) - timedelta(hours=1)
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_SESSION_TOKEN': 'baz',
'AWS_CREDENTIAL_EXPIRATION': expiry_time.isoformat(),
}
provider = credentials.EnvProvider(environ)
creds = provider.load()
self.assertIsInstance(creds, credentials.RefreshableCredentials)
# Since the credentials are expired, we'll trigger a refresh whenever
# we try to access them. But at this point the environment hasn't been
# updated, so when it refreshes it will trigger an exception because
# the new creds are still expired.
error_message = (
"Credentials were refreshed, but the refreshed credentials are "
"still expired."
)
with self.assertRaisesRegexp(RuntimeError, error_message):
creds.get_frozen_credentials()
# Now we update the environment with non-expired credentials,
# so when we access the creds it will refresh and grab the new ones.
expiry_time = datetime.now(tzlocal()) + timedelta(hours=1)
environ.update({
'AWS_ACCESS_KEY_ID': 'bin',
'AWS_SECRET_ACCESS_KEY': 'bam',
'AWS_SESSION_TOKEN': 'biz',
'AWS_CREDENTIAL_EXPIRATION': expiry_time.isoformat(),
})
frozen = creds.get_frozen_credentials()
self.assertEqual(frozen.access_key, 'bin')
self.assertEqual(frozen.secret_key, 'bam')
self.assertEqual(frozen.token, 'biz')
def test_credentials_only_refresh_when_needed(self):
expiry_time = datetime.now(tzlocal()) + timedelta(hours=2)
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_SESSION_TOKEN': 'baz',
'AWS_CREDENTIAL_EXPIRATION': expiry_time.isoformat(),
}
provider = credentials.EnvProvider(environ)
# Perform the initial credential load
creds = provider.load()
# Now that the initial load has been performed, we go ahead and
# change the environment. If the credentials were expired,
# they would immediately refresh upon access and we'd get the new
# ones. Since they've got plenty of time, they shouldn't refresh.
expiry_time = datetime.now(tzlocal()) + timedelta(hours=3)
environ.update({
'AWS_ACCESS_KEY_ID': 'bin',
'AWS_SECRET_ACCESS_KEY': 'bam',
'AWS_SESSION_TOKEN': 'biz',
'AWS_CREDENTIAL_EXPIRATION': expiry_time.isoformat(),
})
frozen = creds.get_frozen_credentials()
self.assertEqual(frozen.access_key, 'foo')
self.assertEqual(frozen.secret_key, 'bar')
self.assertEqual(frozen.token, 'baz')
def test_credentials_not_refreshable_if_no_expiry_present(self):
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_SESSION_TOKEN': 'baz',
}
provider = credentials.EnvProvider(environ)
creds = provider.load()
self.assertNotIsInstance(creds, credentials.RefreshableCredentials)
self.assertIsInstance(creds, credentials.Credentials)
def test_credentials_do_not_become_refreshable(self):
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_SESSION_TOKEN': 'baz',
}
provider = credentials.EnvProvider(environ)
creds = provider.load()
frozen = creds.get_frozen_credentials()
self.assertEqual(frozen.access_key, 'foo')
self.assertEqual(frozen.secret_key, 'bar')
self.assertEqual(frozen.token, 'baz')
expiry_time = datetime.now(tzlocal()) - timedelta(hours=1)
environ.update({
'AWS_ACCESS_KEY_ID': 'bin',
'AWS_SECRET_ACCESS_KEY': 'bam',
'AWS_SESSION_TOKEN': 'biz',
'AWS_CREDENTIAL_EXPIRATION': expiry_time.isoformat(),
})
frozen = creds.get_frozen_credentials()
self.assertEqual(frozen.access_key, 'foo')
self.assertEqual(frozen.secret_key, 'bar')
self.assertEqual(frozen.token, 'baz')
self.assertNotIsInstance(creds, credentials.RefreshableCredentials)
def test_credentials_throw_error_if_expiry_goes_away(self):
expiry_time = datetime.now(tzlocal()) - timedelta(hours=1)
environ = {
'AWS_ACCESS_KEY_ID': 'foo',
'AWS_SECRET_ACCESS_KEY': 'bar',
'AWS_CREDENTIAL_EXPIRATION': expiry_time.isoformat(),
}
provider = credentials.EnvProvider(environ)
creds = provider.load()
del environ['AWS_CREDENTIAL_EXPIRATION']
with self.assertRaises(credentials.PartialCredentialsError):
creds.get_frozen_credentials()
class TestSharedCredentialsProvider(BaseEnvVar):
def setUp(self):
super(TestSharedCredentialsProvider, self).setUp()
self.ini_parser = mock.Mock()
def test_credential_file_exists_default_profile(self):
self.ini_parser.return_value = {
'default': {
'aws_access_key_id': 'foo',
'aws_secret_access_key': 'bar',
}
}
provider = credentials.SharedCredentialProvider(
creds_filename='~/.aws/creds', profile_name='default',
ini_parser=self.ini_parser)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertIsNone(creds.token)
self.assertEqual(creds.method, 'shared-credentials-file')
def test_partial_creds_raise_error(self):
self.ini_parser.return_value = {
'default': {
'aws_access_key_id': 'foo',
# Missing 'aws_secret_access_key'.
}
}
provider = credentials.SharedCredentialProvider(
creds_filename='~/.aws/creds', profile_name='default',
ini_parser=self.ini_parser)
with self.assertRaises(botocore.exceptions.PartialCredentialsError):
provider.load()
def test_credentials_file_exists_with_session_token(self):
self.ini_parser.return_value = {
'default': {
'aws_access_key_id': 'foo',
'aws_secret_access_key': 'bar',
'aws_session_token': 'baz',
}
}
provider = credentials.SharedCredentialProvider(
creds_filename='~/.aws/creds', profile_name='default',
ini_parser=self.ini_parser)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.token, 'baz')
self.assertEqual(creds.method, 'shared-credentials-file')
def test_credentials_file_with_multiple_profiles(self):
self.ini_parser.return_value = {
# Here the user has a 'default' and a 'dev' profile.
'default': {
'aws_access_key_id': 'a',
'aws_secret_access_key': 'b',
'aws_session_token': 'c',
},
'dev': {
'aws_access_key_id': 'd',
'aws_secret_access_key': 'e',
'aws_session_token': 'f',
},
}
# And we specify a profile_name of 'dev'.
provider = credentials.SharedCredentialProvider(
creds_filename='~/.aws/creds', profile_name='dev',
ini_parser=self.ini_parser)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'd')
self.assertEqual(creds.secret_key, 'e')
self.assertEqual(creds.token, 'f')
self.assertEqual(creds.method, 'shared-credentials-file')
def test_credentials_file_does_not_exist_returns_none(self):
# It's ok if the credentials file does not exist, we should
# just catch the appropriate errors and return None.
self.ini_parser.side_effect = botocore.exceptions.ConfigNotFound(
path='foo')
provider = credentials.SharedCredentialProvider(
creds_filename='~/.aws/creds', profile_name='dev',
ini_parser=self.ini_parser)
creds = provider.load()
self.assertIsNone(creds)
class TestConfigFileProvider(BaseEnvVar):
def setUp(self):
super(TestConfigFileProvider, self).setUp()
profile_config = {
'aws_access_key_id': 'a',
'aws_secret_access_key': 'b',
'aws_session_token': 'c',
# Non creds related configs can be in a session's # config.
'region': 'us-west-2',
'output': 'json',
}
parsed = {'profiles': {'default': profile_config}}
parser = mock.Mock()
parser.return_value = parsed
self.parser = parser
def test_config_file_exists(self):
provider = credentials.ConfigProvider('cli.cfg', 'default',
self.parser)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'a')
self.assertEqual(creds.secret_key, 'b')
self.assertEqual(creds.token, 'c')
self.assertEqual(creds.method, 'config-file')
def test_config_file_missing_profile_config(self):
# Referring to a profile that's not in the config file
# will result in session.config returning an empty dict.
profile_name = 'NOT-default'
provider = credentials.ConfigProvider('cli.cfg', profile_name,
self.parser)
creds = provider.load()
self.assertIsNone(creds)
def test_config_file_errors_ignored(self):
# We should move on to the next provider if the config file
# can't be found.
self.parser.side_effect = botocore.exceptions.ConfigNotFound(
path='cli.cfg')
provider = credentials.ConfigProvider('cli.cfg', 'default',
self.parser)
creds = provider.load()
self.assertIsNone(creds)
def test_partial_creds_is_error(self):
profile_config = {
'aws_access_key_id': 'a',
# Missing aws_secret_access_key
}
parsed = {'profiles': {'default': profile_config}}
parser = mock.Mock()
parser.return_value = parsed
provider = credentials.ConfigProvider('cli.cfg', 'default', parser)
with self.assertRaises(botocore.exceptions.PartialCredentialsError):
provider.load()
class TestBotoProvider(BaseEnvVar):
def setUp(self):
super(TestBotoProvider, self).setUp()
self.ini_parser = mock.Mock()
def test_boto_config_file_exists_in_home_dir(self):
environ = {}
self.ini_parser.return_value = {
'Credentials': {
# boto's config file does not support a session token
# so we only test for access_key/secret_key.
'aws_access_key_id': 'a',
'aws_secret_access_key': 'b',
}
}
provider = credentials.BotoProvider(environ=environ,
ini_parser=self.ini_parser)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'a')
self.assertEqual(creds.secret_key, 'b')
self.assertIsNone(creds.token)
self.assertEqual(creds.method, 'boto-config')
def test_env_var_set_for_boto_location(self):
environ = {
'BOTO_CONFIG': 'alternate-config.cfg'
}
self.ini_parser.return_value = {
'Credentials': {
# boto's config file does not support a session token
# so we only test for access_key/secret_key.
'aws_access_key_id': 'a',
'aws_secret_access_key': 'b',
}
}
provider = credentials.BotoProvider(environ=environ,
ini_parser=self.ini_parser)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'a')
self.assertEqual(creds.secret_key, 'b')
self.assertIsNone(creds.token)
self.assertEqual(creds.method, 'boto-config')
# Assert that the parser was called with the filename specified
# in the env var.
self.ini_parser.assert_called_with('alternate-config.cfg')
def test_no_boto_config_file_exists(self):
self.ini_parser.side_effect = botocore.exceptions.ConfigNotFound(
path='foo')
provider = credentials.BotoProvider(environ={},
ini_parser=self.ini_parser)
creds = provider.load()
self.assertIsNone(creds)
def test_partial_creds_is_error(self):
ini_parser = mock.Mock()
ini_parser.return_value = {
'Credentials': {
'aws_access_key_id': 'a',
# Missing aws_secret_access_key.
}
}
provider = credentials.BotoProvider(environ={},
ini_parser=ini_parser)
with self.assertRaises(botocore.exceptions.PartialCredentialsError):
provider.load()
class TestOriginalEC2Provider(BaseEnvVar):
def test_load_ec2_credentials_file_not_exist(self):
provider = credentials.OriginalEC2Provider(environ={})
creds = provider.load()
self.assertIsNone(creds)
def test_load_ec2_credentials_file_exists(self):
environ = {
'AWS_CREDENTIAL_FILE': 'foo.cfg',
}
parser = mock.Mock()
parser.return_value = {
'AWSAccessKeyId': 'a',
'AWSSecretKey': 'b',
}
provider = credentials.OriginalEC2Provider(environ=environ,
parser=parser)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'a')
self.assertEqual(creds.secret_key, 'b')
self.assertIsNone(creds.token)
self.assertEqual(creds.method, 'ec2-credentials-file')
class TestInstanceMetadataProvider(BaseEnvVar):
def test_load_from_instance_metadata(self):
timeobj = datetime.now(tzlocal())
timestamp = (timeobj + timedelta(hours=24)).isoformat()
fetcher = mock.Mock()
fetcher.retrieve_iam_role_credentials.return_value = {
'access_key': 'a',
'secret_key': 'b',
'token': 'c',
'expiry_time': timestamp,
'role_name': 'myrole',
}
provider = credentials.InstanceMetadataProvider(
iam_role_fetcher=fetcher)
creds = provider.load()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'a')
self.assertEqual(creds.secret_key, 'b')
self.assertEqual(creds.token, 'c')
self.assertEqual(creds.method, 'iam-role')
def test_no_role_creds_exist(self):
fetcher = mock.Mock()
fetcher.retrieve_iam_role_credentials.return_value = {}
provider = credentials.InstanceMetadataProvider(
iam_role_fetcher=fetcher)
creds = provider.load()
self.assertIsNone(creds)
fetcher.retrieve_iam_role_credentials.assert_called_with()
class CredentialResolverTest(BaseEnvVar):
def setUp(self):
super(CredentialResolverTest, self).setUp()
self.provider1 = mock.Mock()
self.provider1.METHOD = 'provider1'
self.provider2 = mock.Mock()
self.provider2.METHOD = 'provider2'
self.fake_creds = credentials.Credentials('a', 'b', 'c')
def test_load_credentials_single_provider(self):
self.provider1.load.return_value = self.fake_creds
resolver = credentials.CredentialResolver(providers=[self.provider1])
creds = resolver.load_credentials()
self.assertEqual(creds.access_key, 'a')
self.assertEqual(creds.secret_key, 'b')
self.assertEqual(creds.token, 'c')
def test_get_provider_by_name(self):
resolver = credentials.CredentialResolver(providers=[self.provider1])
result = resolver.get_provider('provider1')
self.assertIs(result, self.provider1)
def test_get_unknown_provider_raises_error(self):
resolver = credentials.CredentialResolver(providers=[self.provider1])
with self.assertRaises(botocore.exceptions.UnknownCredentialError):
resolver.get_provider('unknown-foo')
def test_first_credential_non_none_wins(self):
self.provider1.load.return_value = None
self.provider2.load.return_value = self.fake_creds
resolver = credentials.CredentialResolver(providers=[self.provider1,
self.provider2])
creds = resolver.load_credentials()
self.assertEqual(creds.access_key, 'a')
self.assertEqual(creds.secret_key, 'b')
self.assertEqual(creds.token, 'c')
self.provider1.load.assert_called_with()
self.provider2.load.assert_called_with()
def test_no_creds_loaded(self):
self.provider1.load.return_value = None
self.provider2.load.return_value = None
resolver = credentials.CredentialResolver(providers=[self.provider1,
self.provider2])
creds = resolver.load_credentials()
self.assertIsNone(creds)
def test_inject_additional_providers_after_existing(self):
self.provider1.load.return_value = None
self.provider2.load.return_value = self.fake_creds
resolver = credentials.CredentialResolver(providers=[self.provider1,
self.provider2])
# Now, if we were to call resolver.load() now, provider2 would
# win because it's returning a non None response.
# However we can inject a new provider before provider2 to
# override this process.
# Providers can be added by the METHOD name of each provider.
new_provider = mock.Mock()
new_provider.METHOD = 'new_provider'
new_provider.load.return_value = credentials.Credentials('d', 'e', 'f')
resolver.insert_after('provider1', new_provider)
creds = resolver.load_credentials()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'd')
self.assertEqual(creds.secret_key, 'e')
self.assertEqual(creds.token, 'f')
# Provider 1 should have been called, but provider2 should
# *not* have been called because new_provider already returned
# a non-None response.
self.provider1.load.assert_called_with()
self.assertTrue(not self.provider2.called)
def test_inject_provider_before_existing(self):
new_provider = mock.Mock()
new_provider.METHOD = 'override'
new_provider.load.return_value = credentials.Credentials('x', 'y', 'z')
resolver = credentials.CredentialResolver(providers=[self.provider1,
self.provider2])
resolver.insert_before(self.provider1.METHOD, new_provider)
creds = resolver.load_credentials()
self.assertEqual(creds.access_key, 'x')
self.assertEqual(creds.secret_key, 'y')
self.assertEqual(creds.token, 'z')
def test_can_remove_providers(self):
self.provider1.load.return_value = credentials.Credentials(
'a', 'b', 'c')
self.provider2.load.return_value = credentials.Credentials(
'd', 'e', 'f')
resolver = credentials.CredentialResolver(providers=[self.provider1,
self.provider2])
resolver.remove('provider1')
creds = resolver.load_credentials()
self.assertIsNotNone(creds)
self.assertEqual(creds.access_key, 'd')
self.assertEqual(creds.secret_key, 'e')
self.assertEqual(creds.token, 'f')
self.assertTrue(not self.provider1.load.called)
self.provider2.load.assert_called_with()
def test_provider_unknown(self):
resolver = credentials.CredentialResolver(providers=[self.provider1,
self.provider2])
# No error is raised if you try to remove an unknown provider.
resolver.remove('providerFOO')
# But an error IS raised if you try to insert after an unknown
# provider.
with self.assertRaises(botocore.exceptions.UnknownCredentialError):
resolver.insert_after('providerFoo', None)
class TestCreateCredentialResolver(BaseEnvVar):
def setUp(self):
super(TestCreateCredentialResolver, self).setUp()
self.session = mock.Mock()
self.session_instance_vars = {
'credentials_file': 'a',
'legacy_config_file': 'b',
'config_file': 'c',
'metadata_service_timeout': 'd',
'metadata_service_num_attempts': 'e',
}
self.fake_env_vars = {}
self.session.get_config_variable = self.fake_get_config_variable
def fake_get_config_variable(self, name, methods=None):
if methods == ('instance',):
return self.session_instance_vars.get(name)
elif methods is not None and 'env' in methods:
return self.fake_env_vars.get(name)
def test_create_credential_resolver(self):
resolver = credentials.create_credential_resolver(self.session)
self.assertIsInstance(resolver, credentials.CredentialResolver)
def test_explicit_profile_ignores_env_provider(self):
self.session_instance_vars['profile'] = 'dev'
resolver = credentials.create_credential_resolver(self.session)
self.assertTrue(
all(not isinstance(p, EnvProvider) for p in resolver.providers))
def test_no_profile_checks_env_provider(self):
# If no profile is provided,
self.session_instance_vars.pop('profile', None)
resolver = credentials.create_credential_resolver(self.session)
# Then an EnvProvider should be part of our credential lookup chain.
self.assertTrue(
any(isinstance(p, EnvProvider) for p in resolver.providers))
def test_env_provider_added_if_profile_from_env_set(self):
self.fake_env_vars['profile'] = 'profile-from-env'
resolver = credentials.create_credential_resolver(self.session)
self.assertTrue(
any(isinstance(p, EnvProvider) for p in resolver.providers))
class TestAssumeRoleCredentialProvider(unittest.TestCase):
maxDiff = None
def setUp(self):
self.fake_config = {
'profiles': {
'development': {
'role_arn': 'myrole',
'source_profile': 'longterm',
},
'longterm': {
'aws_access_key_id': 'akid',
'aws_secret_access_key': 'skid',
}
}
}
def create_config_loader(self, with_config=None):
if with_config is None:
with_config = self.fake_config
load_config = mock.Mock()
load_config.return_value = with_config
return load_config
def create_client_creator(self, with_response):
# Create a mock sts client that returns a specific response
# for assume_role.
client = mock.Mock()
if isinstance(with_response, list):
client.assume_role.side_effect = with_response
else:
client.assume_role.return_value = with_response
return mock.Mock(return_value=client)
def some_future_time(self):
timeobj = datetime.now(tzlocal())
return timeobj + timedelta(hours=24)
def test_assume_role_with_no_cache(self):
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': self.some_future_time().isoformat()
},
}
client_creator = self.create_client_creator(with_response=response)
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
client_creator, cache={}, profile_name='development')
creds = provider.load()
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.token, 'baz')
def test_assume_role_with_datetime(self):
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
# Note the lack of isoformat(), we're using
# a datetime.datetime type. This will ensure
# we test both parsing as well as serializing
# from a given datetime because the credentials
# are immediately expired.
'Expiration': datetime.now(tzlocal()) + timedelta(hours=20)
},
}
client_creator = self.create_client_creator(with_response=response)
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
client_creator, cache={}, profile_name='development')
creds = provider.load()
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.token, 'baz')
def test_assume_role_refresher_serializes_datetime(self):
client = mock.Mock()
time_zone = tzutc()
expiration = datetime(
year=2016, month=11, day=6, hour=1, minute=30, tzinfo=time_zone)
client.assume_role.return_value = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': expiration,
}
}
refresh = create_assume_role_refresher(client, {})
expiry_time = refresh()['expiry_time']
self.assertEqual(expiry_time, '2016-11-06T01:30:00UTC')
def test_assume_role_retrieves_from_cache(self):
date_in_future = datetime.utcnow() + timedelta(seconds=1000)
utc_timestamp = date_in_future.isoformat() + 'Z'
self.fake_config['profiles']['development']['role_arn'] = 'myrole'
cache = {
'development--myrole': {
'Credentials': {
'AccessKeyId': 'foo-cached',
'SecretAccessKey': 'bar-cached',
'SessionToken': 'baz-cached',
'Expiration': utc_timestamp,
}
}
}
provider = credentials.AssumeRoleProvider(
self.create_config_loader(), mock.Mock(),
cache=cache, profile_name='development')
creds = provider.load()
self.assertEqual(creds.access_key, 'foo-cached')
self.assertEqual(creds.secret_key, 'bar-cached')
self.assertEqual(creds.token, 'baz-cached')
def test_cache_key_is_windows_safe(self):
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': datetime.now(tzlocal()).isoformat()
},
}
cache = {}
self.fake_config['profiles']['development']['role_arn'] = (
'arn:aws:iam::foo-role')
client_creator = self.create_client_creator(with_response=response)
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
client_creator, cache=cache, profile_name='development')
provider.load()
# On windows, you cannot use a a ':' in the filename, so
# we need to do some small transformations on the filename
# to replace any ':' that come up.
self.assertEqual(cache['development--arn_aws_iam__foo-role'],
response)
def test_cache_key_with_role_session_name(self):
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': datetime.now(tzlocal()).isoformat()
},
}
cache = {}
self.fake_config['profiles']['development']['role_arn'] = (
'arn:aws:iam::foo-role')
self.fake_config['profiles']['development']['role_session_name'] = (
'foo_role_session_name')
client_creator = self.create_client_creator(with_response=response)
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
client_creator, cache=cache, profile_name='development')
provider.load()
self.assertEqual(
cache['development--arn_aws_iam__foo-role--foo_role_session_name'],
response)
def test_assume_role_in_cache_but_expired(self):
expired_creds = datetime.now(tzlocal())
valid_creds = expired_creds + timedelta(hours=1)
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': valid_creds,
},
}
client_creator = self.create_client_creator(with_response=response)
cache = {
'development--myrole': {
'Credentials': {
'AccessKeyId': 'foo-cached',
'SecretAccessKey': 'bar-cached',
'SessionToken': 'baz-cached',
'Expiration': expired_creds,
}
}
}
provider = credentials.AssumeRoleProvider(
self.create_config_loader(), client_creator,
cache=cache, profile_name='development')
creds = provider.load()
self.assertEqual(creds.access_key, 'foo')
self.assertEqual(creds.secret_key, 'bar')
self.assertEqual(creds.token, 'baz')
def test_role_session_name_provided(self):
dev_profile = self.fake_config['profiles']['development']
dev_profile['role_session_name'] = 'myname'
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': datetime.now(tzlocal()).isoformat(),
},
}
client_creator = self.create_client_creator(with_response=response)
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
client_creator, cache={}, profile_name='development')
provider.load()
client = client_creator.return_value
client.assume_role.assert_called_with(
RoleArn='myrole', RoleSessionName='myname')
def test_external_id_provided(self):
self.fake_config['profiles']['development']['external_id'] = 'myid'
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': datetime.now(tzlocal()).isoformat(),
},
}
client_creator = self.create_client_creator(with_response=response)
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
client_creator, cache={}, profile_name='development')
provider.load()
client = client_creator.return_value
client.assume_role.assert_called_with(
RoleArn='myrole', ExternalId='myid', RoleSessionName=mock.ANY)
def test_assume_role_with_mfa(self):
self.fake_config['profiles']['development']['mfa_serial'] = 'mfa'
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': datetime.now(tzlocal()).isoformat(),
},
}
client_creator = self.create_client_creator(with_response=response)
prompter = mock.Mock(return_value='token-code')
provider = credentials.AssumeRoleProvider(
self.create_config_loader(), client_creator,
cache={}, profile_name='development', prompter=prompter)
provider.load()
client = client_creator.return_value
# In addition to the normal assume role args, we should also
# inject the serial number from the config as well as the
# token code that comes from prompting the user (the prompter
# object).
client.assume_role.assert_called_with(
RoleArn='myrole', RoleSessionName=mock.ANY, SerialNumber='mfa',
TokenCode='token-code')
def test_assume_role_populates_session_name_on_refresh(self):
responses = [{
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
# We're creating an expiry time in the past so as
# soon as we try to access the credentials, the
# refresh behavior will be triggered.
'Expiration': (
datetime.now(tzlocal()) -
timedelta(seconds=100)).isoformat(),
},
}, {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
'Expiration': (
datetime.now(tzlocal()) + timedelta(seconds=100)
).isoformat(),
}
}]
client_creator = self.create_client_creator(with_response=responses)
provider = credentials.AssumeRoleProvider(
self.create_config_loader(), client_creator,
cache={}, profile_name='development',
prompter=mock.Mock(return_value='token-code'))
# This will trigger the first assume_role() call. It returns
# credentials that are expired and will trigger a refresh.
creds = provider.load()
# This will trigger the second assume_role() call because
# a refresh is needed.
creds.get_frozen_credentials()
client = client_creator.return_value
assume_role_calls = client.assume_role.call_args_list
self.assertEqual(len(assume_role_calls), 2, assume_role_calls)
# The args should be identical. That is, the second
# assume_role call should have the exact same args as the
# initial assume_role call.
self.assertEqual(assume_role_calls[0], assume_role_calls[1])
def test_assume_role_mfa_cannot_refresh_credentials(self):
# Note: we should look into supporting optional behavior
# in the future that allows for reprompting for credentials.
# But for now, if we get temp creds with MFA then when those
# creds expire, we can't refresh the credentials.
self.fake_config['profiles']['development']['mfa_serial'] = 'mfa'
response = {
'Credentials': {
'AccessKeyId': 'foo',
'SecretAccessKey': 'bar',
'SessionToken': 'baz',
# We're creating an expiry time in the past so as
# soon as we try to access the credentials, the
# refresh behavior will be triggered.
'Expiration': (
datetime.now(tzlocal()) -
timedelta(seconds=100)).isoformat(),
},
}
client_creator = self.create_client_creator(with_response=response)
provider = credentials.AssumeRoleProvider(
self.create_config_loader(), client_creator,
cache={}, profile_name='development',
prompter=mock.Mock(return_value='token-code'))
creds = provider.load()
with self.assertRaises(credentials.RefreshWithMFAUnsupportedError):
# access_key is a property that will refresh credentials
# if they're expired. Because we set the expiry time to
# something in the past, this will trigger the refresh
# behavior, with with MFA will currently raise an exception.
creds.access_key
def test_no_config_is_noop(self):
self.fake_config['profiles']['development'] = {
'aws_access_key_id': 'foo',
'aws_secret_access_key': 'bar',
}
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
mock.Mock(), cache={}, profile_name='development')
# Because a role_arn was not specified, the AssumeRoleProvider
# is a noop and will not return credentials (which means we
# move on to the next provider).
creds = provider.load()
self.assertIsNone(creds)
def test_source_profile_not_provided(self):
del self.fake_config['profiles']['development']['source_profile']
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
mock.Mock(), cache={}, profile_name='development')
# source_profile is required, we shoudl get an error.
with self.assertRaises(botocore.exceptions.PartialCredentialsError):
provider.load()
def test_source_profile_does_not_exist(self):
dev_profile = self.fake_config['profiles']['development']
dev_profile['source_profile'] = 'does-not-exist'
provider = credentials.AssumeRoleProvider(
self.create_config_loader(),
mock.Mock(), cache={}, profile_name='development')
# source_profile is required, we shoudl get an error.
with self.assertRaises(botocore.exceptions.InvalidConfigError):
provider.load()
class TestRefreshLogic(unittest.TestCase):
def test_mandatory_refresh_needed(self):
creds = IntegerRefresher(
# These values will immediately trigger
# a manadatory refresh.
creds_last_for=2,
mandatory_refresh=3,
advisory_refresh=3)
temp = creds.get_frozen_credentials()
self.assertEqual(
temp, credentials.ReadOnlyCredentials('1', '1', '1'))
def test_advisory_refresh_needed(self):
creds = IntegerRefresher(
# These values will immediately trigger
# a manadatory refresh.
creds_last_for=4,
mandatory_refresh=2,
advisory_refresh=5)
temp = creds.get_frozen_credentials()
self.assertEqual(
temp, credentials.ReadOnlyCredentials('1', '1', '1'))
def test_refresh_fails_is_not_an_error_during_advisory_period(self):
fail_refresh = mock.Mock(side_effect=Exception("refresh failed"))
creds = IntegerRefresher(
creds_last_for=5,
advisory_refresh=7,
mandatory_refresh=3,
refresh_function=fail_refresh
)
temp = creds.get_frozen_credentials()
# We should have called the refresh function.
self.assertTrue(fail_refresh.called)
# The fail_refresh function will raise an exception.
# Because we're in the advisory period we'll not propogate
# the exception and return the current set of credentials
# (generation '1').
self.assertEqual(
temp, credentials.ReadOnlyCredentials('0', '0', '0'))
def test_exception_propogated_on_error_during_mandatory_period(self):
fail_refresh = mock.Mock(side_effect=Exception("refresh failed"))
creds = IntegerRefresher(
creds_last_for=5,
advisory_refresh=10,
# Note we're in the mandatory period now (5 < 7< 10).
mandatory_refresh=7,
refresh_function=fail_refresh
)
with self.assertRaisesRegexp(Exception, 'refresh failed'):
creds.get_frozen_credentials()
def test_exception_propogated_on_expired_credentials(self):
fail_refresh = mock.Mock(side_effect=Exception("refresh failed"))
creds = IntegerRefresher(
# Setting this to 0 mean the credentials are immediately
# expired.
creds_last_for=0,
advisory_refresh=10,
mandatory_refresh=7,
refresh_function=fail_refresh
)
with self.assertRaisesRegexp(Exception, 'refresh failed'):
# Because credentials are actually expired, any
# failure to refresh should be propagated.
creds.get_frozen_credentials()
def test_refresh_giving_expired_credentials_raises_exception(self):
# This verifies an edge cases where refreshed credentials
# still give expired credentials:
# 1. We see credentials are expired.
# 2. We try to refresh the credentials.
# 3. The "refreshed" credentials are still expired.
#
# In this case, we hard fail and let the user know what
# happened.
creds = IntegerRefresher(
# Negative number indicates that the credentials
# have already been expired for 2 seconds, even
# on refresh.
creds_last_for=-2,
)
err_msg = 'refreshed credentials are still expired'
with self.assertRaisesRegexp(RuntimeError, err_msg):
# Because credentials are actually expired, any
# failure to refresh should be propagated.
creds.get_frozen_credentials()
class TestContainerProvider(BaseEnvVar):
def test_noop_if_env_var_is_not_set(self):
# The 'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI' env var
# is not present as an env var.
environ = {}
provider = credentials.ContainerProvider(environ)
creds = provider.load()
self.assertIsNone(creds)
def full_url(self, url):
return 'http://%s%s' % (ContainerMetadataFetcher.IP_ADDRESS, url)
def create_fetcher(self):
fetcher = mock.Mock(spec=ContainerMetadataFetcher)
fetcher.full_url = self.full_url
return fetcher
def test_retrieve_from_provider_if_env_var_present(self):
environ = {
'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/latest/credentials?id=foo'
}
fetcher = self.create_fetcher()
timeobj = datetime.now(tzlocal())
timestamp = (timeobj + timedelta(hours=24)).isoformat()
fetcher.retrieve_full_uri.return_value = {
"AccessKeyId" : "access_key",
"SecretAccessKey" : "secret_key",
"Token" : "token",
"Expiration" : timestamp,
}
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()
fetcher.retrieve_full_uri.assert_called_with(
self.full_url('/latest/credentials?id=foo'), headers=None)
self.assertEqual(creds.access_key, 'access_key')
self.assertEqual(creds.secret_key, 'secret_key')
self.assertEqual(creds.token, 'token')
self.assertEqual(creds.method, 'container-role')
def test_creds_refresh_when_needed(self):
environ = {
'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/latest/credentials?id=foo'
}
fetcher = mock.Mock(spec=credentials.ContainerMetadataFetcher)
timeobj = datetime.now(tzlocal())
expired_timestamp = (timeobj - timedelta(hours=23)).isoformat()
future_timestamp = (timeobj + timedelta(hours=1)).isoformat()
fetcher.retrieve_full_uri.side_effect = [
{
"AccessKeyId" : "access_key_old",
"SecretAccessKey" : "secret_key_old",
"Token" : "token_old",
"Expiration" : expired_timestamp,
},
{
"AccessKeyId" : "access_key_new",
"SecretAccessKey" : "secret_key_new",
"Token" : "token_new",
"Expiration" : future_timestamp,
}
]
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()
frozen_creds = creds.get_frozen_credentials()
self.assertEqual(frozen_creds.access_key, 'access_key_new')
self.assertEqual(frozen_creds.secret_key, 'secret_key_new')
self.assertEqual(frozen_creds.token, 'token_new')
def test_http_error_propagated(self):
environ = {
'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/latest/credentials?id=foo'
}
fetcher = mock.Mock(spec=credentials.ContainerMetadataFetcher)
timeobj = datetime.now(tzlocal())
expired_timestamp = (timeobj - timedelta(hours=23)).isoformat()
future_timestamp = (timeobj + timedelta(hours=1)).isoformat()
exception = botocore.exceptions.CredentialRetrievalError
fetcher.retrieve_full_uri.side_effect = exception(provider='ecs-role',
error_msg='fake http error')
with self.assertRaises(exception):
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()
def test_http_error_propagated_on_refresh(self):
# We should ensure errors are still propagated even in the
# case of a failed refresh.
environ = {
'AWS_CONTAINER_CREDENTIALS_RELATIVE_URI': '/latest/credentials?id=foo'
}
fetcher = mock.Mock(spec=credentials.ContainerMetadataFetcher)
timeobj = datetime.now(tzlocal())
expired_timestamp = (timeobj - timedelta(hours=23)).isoformat()
http_exception = botocore.exceptions.MetadataRetrievalError
raised_exception = botocore.exceptions.CredentialRetrievalError
fetcher.retrieve_full_uri.side_effect = [
{
"AccessKeyId" : "access_key_old",
"SecretAccessKey" : "secret_key_old",
"Token" : "token_old",
"Expiration" : expired_timestamp,
},
http_exception(error_msg='HTTP connection timeout')
]
provider = credentials.ContainerProvider(environ, fetcher)
# First time works with no issues.
creds = provider.load()
# Second time with a refresh should propagate an error.
with self.assertRaises(raised_exception):
frozen_creds = creds.get_frozen_credentials()
def test_can_use_full_url(self):
environ = {
'AWS_CONTAINER_CREDENTIALS_FULL_URI': 'http://localhost/foo'
}
fetcher = self.create_fetcher()
timeobj = datetime.now(tzlocal())
timestamp = (timeobj + timedelta(hours=24)).isoformat()
fetcher.retrieve_full_uri.return_value = {
"AccessKeyId" : "access_key",
"SecretAccessKey" : "secret_key",
"Token" : "token",
"Expiration" : timestamp,
}
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()
fetcher.retrieve_full_uri.assert_called_with('http://localhost/foo',
headers=None)
self.assertEqual(creds.access_key, 'access_key')
self.assertEqual(creds.secret_key, 'secret_key')
self.assertEqual(creds.token, 'token')
self.assertEqual(creds.method, 'container-role')
def test_can_pass_basic_auth_token(self):
environ = {
'AWS_CONTAINER_CREDENTIALS_FULL_URI': 'http://localhost/foo',
'AWS_CONTAINER_AUTHORIZATION_TOKEN': 'Basic auth-token',
}
fetcher = self.create_fetcher()
timeobj = datetime.now(tzlocal())
timestamp = (timeobj + timedelta(hours=24)).isoformat()
fetcher.retrieve_full_uri.return_value = {
"AccessKeyId" : "access_key",
"SecretAccessKey" : "secret_key",
"Token" : "token",
"Expiration" : timestamp,
}
provider = credentials.ContainerProvider(environ, fetcher)
creds = provider.load()
fetcher.retrieve_full_uri.assert_called_with(
'http://localhost/foo', headers={'Authorization': 'Basic auth-token'})
self.assertEqual(creds.access_key, 'access_key')
self.assertEqual(creds.secret_key, 'secret_key')
self.assertEqual(creds.token, 'token')
self.assertEqual(creds.method, 'container-role')