python-botocore/tests/functional/test_credentials.py

1083 lines
40 KiB
Python
Raw Normal View History

# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
2022-12-12 17:14:19 +01:00
import json
import math
2022-05-26 00:10:07 +02:00
import os
2018-01-15 17:34:17 +01:00
import shutil
import sys
2022-05-26 00:10:07 +02:00
import tempfile
import threading
import time
import uuid
from datetime import datetime, timedelta
2018-01-15 17:34:17 +01:00
2022-12-12 17:14:19 +01:00
import pytest
2018-01-15 17:34:17 +01:00
from dateutil.tz import tzlocal
2020-06-18 21:07:50 +02:00
from botocore import UNSIGNED
from botocore.config import Config
2022-05-26 00:10:07 +02:00
from botocore.credentials import (
AssumeRoleProvider,
CanonicalNameCredentialSourcer,
ContainerProvider,
Credentials,
DeferredRefreshableCredentials,
EnvProvider,
InstanceMetadataProvider,
JSONFileCache,
ProfileProviderBuilder,
ReadOnlyCredentials,
create_credential_resolver,
)
from botocore.exceptions import (
CredentialRetrievalError,
InfiniteLoopConfigError,
InvalidConfigError,
)
2018-01-15 17:34:17 +01:00
from botocore.session import Session
from botocore.stub import Stubber
2022-12-12 17:14:19 +01:00
from botocore.tokens import SSOTokenProvider
2020-06-18 21:07:50 +02:00
from botocore.utils import datetime2timestamp
2022-05-26 00:10:07 +02:00
from tests import (
BaseEnvVar,
IntegerRefresher,
SessionHTTPStubber,
StubbedSession,
mock,
random_chars,
temporary_file,
unittest,
)
2022-12-12 17:14:19 +01:00
TIME_IN_ONE_HOUR = datetime.utcnow() + timedelta(hours=1)
TIME_IN_SIX_MONTHS = datetime.utcnow() + timedelta(hours=4320)
class TestCredentialRefreshRaces(unittest.TestCase):
def assert_consistent_credentials_seen(self, creds, func):
collected = []
2018-05-08 03:57:43 +02:00
self._run_threads(20, func, collected)
for creds in collected:
# During testing, the refresher uses it's current
# refresh count as the values for the access, secret, and
# token value. This means that at any given point in time,
# the credentials should be something like:
#
# ReadOnlyCredentials('1', '1', '1')
# ReadOnlyCredentials('2', '2', '2')
# ...
# ReadOnlyCredentials('30', '30', '30')
#
# This makes it really easy to verify we see a consistent
# set of credentials from the same time period. We just
# check if all the credential values are the same. If
# we ever see something like:
#
# ReadOnlyCredentials('1', '2', '1')
#
# We fail. This is because we're using the access_key
# from the first refresh ('1'), the secret key from
# the second refresh ('2'), and the token from the
# first refresh ('1').
self.assertTrue(creds[0] == creds[1] == creds[2], creds)
2018-05-08 03:57:43 +02:00
def assert_non_none_retrieved_credentials(self, func):
collected = []
self._run_threads(50, func, collected)
for cred in collected:
self.assertIsNotNone(cred)
def _run_threads(self, num_threads, func, collected):
threads = []
for _ in range(num_threads):
threads.append(threading.Thread(target=func, args=(collected,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
def test_has_no_race_conditions(self):
creds = IntegerRefresher(
2022-05-26 00:10:07 +02:00
creds_last_for=2, advisory_refresh=1, mandatory_refresh=0
)
2021-11-03 18:14:15 +01:00
def _run_in_thread(collected):
for _ in range(4000):
frozen = creds.get_frozen_credentials()
2022-05-26 00:10:07 +02:00
collected.append(
(frozen.access_key, frozen.secret_key, frozen.token)
)
start = time.time()
self.assert_consistent_credentials_seen(creds, _run_in_thread)
end = time.time()
# creds_last_for = 2 seconds (from above)
# So, for example, if execution time took 6.1 seconds, then
# we should see a maximum number of refreshes being (6 / 2.0) + 1 = 4
max_calls_allowed = math.ceil((end - start) / 2.0) + 1
2022-05-26 00:10:07 +02:00
self.assertTrue(
creds.refresh_counter <= max_calls_allowed,
"Too many cred refreshes, max: %s, actual: %s, "
"time_delta: %.4f"
% (max_calls_allowed, creds.refresh_counter, (end - start)),
)
def test_no_race_for_immediate_advisory_expiration(self):
creds = IntegerRefresher(
2022-05-26 00:10:07 +02:00
creds_last_for=1, advisory_refresh=1, mandatory_refresh=0
)
2021-11-03 18:14:15 +01:00
def _run_in_thread(collected):
for _ in range(100):
frozen = creds.get_frozen_credentials()
2022-05-26 00:10:07 +02:00
collected.append(
(frozen.access_key, frozen.secret_key, frozen.token)
)
self.assert_consistent_credentials_seen(creds, _run_in_thread)
2018-01-15 17:34:17 +01:00
2018-05-08 03:57:43 +02:00
def test_no_race_for_initial_refresh_of_deferred_refreshable(self):
def get_credentials():
expiry_time = (
2022-05-26 00:10:07 +02:00
datetime.now(tzlocal()) + timedelta(hours=24)
).isoformat()
2018-05-08 03:57:43 +02:00
return {
'access_key': 'my-access-key',
'secret_key': 'my-secret-key',
'token': 'my-token',
2022-05-26 00:10:07 +02:00
'expiry_time': expiry_time,
2018-05-08 03:57:43 +02:00
}
deferred_creds = DeferredRefreshableCredentials(
2022-05-26 00:10:07 +02:00
get_credentials, 'fixed'
)
2018-05-08 03:57:43 +02:00
def _run_in_thread(collected):
frozen = deferred_creds.get_frozen_credentials()
collected.append(frozen)
self.assert_non_none_retrieved_credentials(_run_in_thread)
2018-01-15 17:34:17 +01:00
2019-08-03 07:08:36 +02:00
class BaseAssumeRoleTest(BaseEnvVar):
2018-01-15 17:34:17 +01:00
def setUp(self):
2022-05-26 00:10:07 +02:00
super().setUp()
2018-01-15 17:34:17 +01:00
self.tempdir = tempfile.mkdtemp()
self.config_file = os.path.join(self.tempdir, 'config')
self.environ['AWS_CONFIG_FILE'] = self.config_file
2018-11-28 09:58:03 +01:00
self.environ['AWS_SHARED_CREDENTIALS_FILE'] = str(uuid.uuid4())
2019-08-03 07:08:36 +02:00
def tearDown(self):
shutil.rmtree(self.tempdir)
2022-05-26 00:10:07 +02:00
super().tearDown()
2019-08-03 07:08:36 +02:00
def some_future_time(self):
timeobj = datetime.now(tzlocal())
return timeobj + timedelta(hours=24)
def create_assume_role_response(self, credentials, expiration=None):
if expiration is None:
expiration = self.some_future_time()
response = {
'Credentials': {
'AccessKeyId': credentials.access_key,
'SecretAccessKey': credentials.secret_key,
'SessionToken': credentials.token,
2022-05-26 00:10:07 +02:00
'Expiration': expiration,
2019-08-03 07:08:36 +02:00
},
'AssumedRoleUser': {
'AssumedRoleId': 'myroleid',
2022-05-26 00:10:07 +02:00
'Arn': 'arn:aws:iam::1234567890:user/myuser',
},
2019-08-03 07:08:36 +02:00
}
return response
def create_random_credentials(self):
return Credentials(
'fake-%s' % random_chars(15),
'fake-%s' % random_chars(35),
2022-05-26 00:10:07 +02:00
'fake-%s' % random_chars(45),
2019-08-03 07:08:36 +02:00
)
def assert_creds_equal(self, c1, c2):
c1_frozen = c1
if not isinstance(c1_frozen, ReadOnlyCredentials):
c1_frozen = c1.get_frozen_credentials()
c2_frozen = c2
if not isinstance(c2_frozen, ReadOnlyCredentials):
c2_frozen = c2.get_frozen_credentials()
self.assertEqual(c1_frozen, c2_frozen)
def write_config(self, config):
with open(self.config_file, 'w') as f:
f.write(config)
class TestAssumeRole(BaseAssumeRoleTest):
def setUp(self):
2022-05-26 00:10:07 +02:00
super().setUp()
2018-01-15 17:34:17 +01:00
self.environ['AWS_ACCESS_KEY_ID'] = 'access_key'
self.environ['AWS_SECRET_ACCESS_KEY'] = 'secret_key'
self.metadata_provider = self.mock_provider(InstanceMetadataProvider)
self.env_provider = self.mock_provider(EnvProvider)
self.container_provider = self.mock_provider(ContainerProvider)
2019-08-03 07:08:36 +02:00
self.mock_client_creator = mock.Mock(spec=Session.create_client)
2018-11-28 09:58:03 +01:00
self.actual_client_region = None
2018-01-15 17:34:17 +01:00
2019-08-03 07:08:36 +02:00
current_dir = os.path.dirname(os.path.abspath(__file__))
credential_process = os.path.join(
current_dir, 'utils', 'credentialprocess.py'
)
2022-05-26 00:10:07 +02:00
self.credential_process = '{} {}'.format(
2019-08-03 07:08:36 +02:00
sys.executable, credential_process
)
2018-01-15 17:34:17 +01:00
def mock_provider(self, provider_cls):
mock_instance = mock.Mock(spec=provider_cls)
mock_instance.load.return_value = None
mock_instance.METHOD = provider_cls.METHOD
mock_instance.CANONICAL_NAME = provider_cls.CANONICAL_NAME
return mock_instance
def create_session(self, profile=None):
2019-08-03 07:08:36 +02:00
session = StubbedSession(profile=profile)
2018-01-15 17:34:17 +01:00
# We have to set bogus credentials here or otherwise we'll trigger
# an early credential chain resolution.
sts = session.create_client(
'sts',
aws_access_key_id='spam',
aws_secret_access_key='eggs',
)
2019-08-03 07:08:36 +02:00
self.mock_client_creator.return_value = sts
2018-01-15 17:34:17 +01:00
assume_role_provider = AssumeRoleProvider(
load_config=lambda: session.full_config,
2019-08-03 07:08:36 +02:00
client_creator=self.mock_client_creator,
2018-01-15 17:34:17 +01:00
cache={},
profile_name=profile,
2022-05-26 00:10:07 +02:00
credential_sourcer=CanonicalNameCredentialSourcer(
[
self.env_provider,
self.container_provider,
self.metadata_provider,
]
),
2020-06-18 21:07:50 +02:00
profile_provider_builder=ProfileProviderBuilder(
session,
sso_token_cache=JSONFileCache(self.tempdir),
),
2018-01-15 17:34:17 +01:00
)
2019-08-03 07:08:36 +02:00
stubber = session.stub('sts')
stubber.activate()
2018-01-15 17:34:17 +01:00
component_name = 'credential_provider'
resolver = session.get_component(component_name)
available_methods = [p.METHOD for p in resolver.providers]
replacements = {
'env': self.env_provider,
'iam-role': self.metadata_provider,
'container-role': self.container_provider,
2022-05-26 00:10:07 +02:00
'assume-role': assume_role_provider,
2018-01-15 17:34:17 +01:00
}
for name, provider in replacements.items():
try:
index = available_methods.index(name)
except ValueError:
# The provider isn't in the session
continue
resolver.providers[index] = provider
2022-05-26 00:10:07 +02:00
session.register_component('credential_provider', resolver)
2018-01-15 17:34:17 +01:00
return session, stubber
def test_assume_role(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n\n'
'[profile B]\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n'
)
self.write_config(config)
expected_creds = self.create_random_credentials()
response = self.create_assume_role_response(expected_creds)
session, stubber = self.create_session(profile='A')
stubber.add_response('assume_role', response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
stubber.assert_no_pending_responses()
def test_environment_credential_source(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'credential_source = Environment\n'
)
self.write_config(config)
environment_creds = self.create_random_credentials()
self.env_provider.load.return_value = environment_creds
expected_creds = self.create_random_credentials()
response = self.create_assume_role_response(expected_creds)
session, stubber = self.create_session(profile='A')
stubber.add_response('assume_role', response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
stubber.assert_no_pending_responses()
self.assertEqual(self.env_provider.load.call_count, 1)
def test_instance_metadata_credential_source(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'credential_source = Ec2InstanceMetadata\n'
)
self.write_config(config)
metadata_creds = self.create_random_credentials()
self.metadata_provider.load.return_value = metadata_creds
expected_creds = self.create_random_credentials()
response = self.create_assume_role_response(expected_creds)
session, stubber = self.create_session(profile='A')
stubber.add_response('assume_role', response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
stubber.assert_no_pending_responses()
self.assertEqual(self.metadata_provider.load.call_count, 1)
def test_container_credential_source(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'credential_source = EcsContainer\n'
)
self.write_config(config)
container_creds = self.create_random_credentials()
self.container_provider.load.return_value = container_creds
expected_creds = self.create_random_credentials()
response = self.create_assume_role_response(expected_creds)
session, stubber = self.create_session(profile='A')
stubber.add_response('assume_role', response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
stubber.assert_no_pending_responses()
self.assertEqual(self.container_provider.load.call_count, 1)
def test_invalid_credential_source(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'credential_source = CustomInvalidProvider\n'
)
self.write_config(config)
with self.assertRaises(InvalidConfigError):
session, _ = self.create_session(profile='A')
session.get_credentials()
2018-02-03 10:30:12 +01:00
def test_misconfigured_source_profile(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n'
'[profile B]\n'
2019-08-03 07:08:36 +02:00
'region = us-west-2\n'
2018-02-03 10:30:12 +01:00
)
self.write_config(config)
with self.assertRaises(InvalidConfigError):
session, _ = self.create_session(profile='A')
2019-08-03 07:08:36 +02:00
session.get_credentials().get_frozen_credentials()
2018-02-03 10:30:12 +01:00
2018-01-15 17:34:17 +01:00
def test_recursive_assume_role(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n\n'
'[profile B]\n'
'role_arn = arn:aws:iam::123456789:role/RoleB\n'
'source_profile = C\n\n'
'[profile C]\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n'
)
self.write_config(config)
profile_b_creds = self.create_random_credentials()
profile_b_response = self.create_assume_role_response(profile_b_creds)
profile_a_creds = self.create_random_credentials()
profile_a_response = self.create_assume_role_response(profile_a_creds)
session, stubber = self.create_session(profile='A')
stubber.add_response('assume_role', profile_b_response)
stubber.add_response('assume_role', profile_a_response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, profile_a_creds)
stubber.assert_no_pending_responses()
def test_recursive_assume_role_stops_at_static_creds(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n\n'
'[profile B]\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n'
'role_arn = arn:aws:iam::123456789:role/RoleB\n'
'source_profile = C\n\n'
'[profile C]\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n'
)
self.write_config(config)
profile_a_creds = self.create_random_credentials()
profile_a_response = self.create_assume_role_response(profile_a_creds)
session, stubber = self.create_session(profile='A')
stubber.add_response('assume_role', profile_a_response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, profile_a_creds)
stubber.assert_no_pending_responses()
def test_infinitely_recursive_assume_role(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = A\n'
)
self.write_config(config)
with self.assertRaises(InfiniteLoopConfigError):
session, _ = self.create_session(profile='A')
session.get_credentials()
2019-08-03 07:08:36 +02:00
def test_process_source_profile(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n'
'[profile B]\n'
'credential_process = %s\n' % self.credential_process
)
self.write_config(config)
expected_creds = self.create_random_credentials()
response = self.create_assume_role_response(expected_creds)
session, stubber = self.create_session(profile='A')
stubber.add_response('assume_role', response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
stubber.assert_no_pending_responses()
# Assert that the client was created with the credentials from the
# credential process.
self.assertEqual(self.mock_client_creator.call_count, 1)
_, kwargs = self.mock_client_creator.call_args_list[0]
expected_kwargs = {
'aws_access_key_id': 'spam',
'aws_secret_access_key': 'eggs',
'aws_session_token': None,
}
self.assertEqual(kwargs, expected_kwargs)
def test_web_identity_source_profile(self):
token_path = os.path.join(self.tempdir, 'token')
with open(token_path, 'w') as token_file:
token_file.write('a.token')
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n'
'[profile B]\n'
'role_arn = arn:aws:iam::123456789:role/RoleB\n'
'web_identity_token_file = %s\n' % token_path
)
self.write_config(config)
session, stubber = self.create_session(profile='A')
identity_creds = self.create_random_credentials()
identity_response = self.create_assume_role_response(identity_creds)
stubber.add_response(
'assume_role_with_web_identity',
identity_response,
)
expected_creds = self.create_random_credentials()
assume_role_response = self.create_assume_role_response(expected_creds)
stubber.add_response('assume_role', assume_role_response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
stubber.assert_no_pending_responses()
# Assert that the client was created with the credentials from the
# assume role with web identity call.
self.assertEqual(self.mock_client_creator.call_count, 1)
_, kwargs = self.mock_client_creator.call_args_list[0]
expected_kwargs = {
'aws_access_key_id': identity_creds.access_key,
'aws_secret_access_key': identity_creds.secret_key,
'aws_session_token': identity_creds.token,
}
self.assertEqual(kwargs, expected_kwargs)
def test_web_identity_source_profile_ignores_env_vars(self):
token_path = os.path.join(self.tempdir, 'token')
with open(token_path, 'w') as token_file:
token_file.write('a.token')
self.environ['AWS_ROLE_ARN'] = 'arn:aws:iam::123456789:role/RoleB'
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n'
'[profile B]\n'
'web_identity_token_file = %s\n' % token_path
)
self.write_config(config)
session, _ = self.create_session(profile='A')
# The config is split between the profile and the env, we
# should only be looking at the profile so this should raise
# a configuration error.
with self.assertRaises(InvalidConfigError):
session.get_credentials()
2020-06-18 21:07:50 +02:00
def test_sso_source_profile(self):
token_cache_key = 'f395038c92f1828cbb3991d2d6152d326b895606'
cached_token = {
'accessToken': 'a.token',
'expiresAt': self.some_future_time(),
}
temp_cache = JSONFileCache(self.tempdir)
temp_cache[token_cache_key] = cached_token
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n'
'[profile B]\n'
'sso_region = us-east-1\n'
'sso_start_url = https://test.url/start\n'
'sso_role_name = SSORole\n'
'sso_account_id = 1234567890\n'
)
self.write_config(config)
session, sts_stubber = self.create_session(profile='A')
client_config = Config(
region_name='us-east-1',
signature_version=UNSIGNED,
)
sso_stubber = session.stub('sso', config=client_config)
sso_stubber.activate()
# The expiration needs to be in milliseconds
expiration = datetime2timestamp(self.some_future_time()) * 1000
sso_role_creds = self.create_random_credentials()
sso_role_response = {
'roleCredentials': {
'accessKeyId': sso_role_creds.access_key,
'secretAccessKey': sso_role_creds.secret_key,
'sessionToken': sso_role_creds.token,
'expiration': int(expiration),
}
}
sso_stubber.add_response('get_role_credentials', sso_role_response)
expected_creds = self.create_random_credentials()
assume_role_response = self.create_assume_role_response(expected_creds)
sts_stubber.add_response('assume_role', assume_role_response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
sts_stubber.assert_no_pending_responses()
# Assert that the client was created with the credentials from the
# SSO get role credentials response
self.assertEqual(self.mock_client_creator.call_count, 1)
_, kwargs = self.mock_client_creator.call_args_list[0]
expected_kwargs = {
'aws_access_key_id': sso_role_creds.access_key,
'aws_secret_access_key': sso_role_creds.secret_key,
'aws_session_token': sso_role_creds.token,
}
self.assertEqual(kwargs, expected_kwargs)
2019-08-03 07:08:36 +02:00
def test_web_identity_credential_source_ignores_env_vars(self):
token_path = os.path.join(self.tempdir, 'token')
with open(token_path, 'w') as token_file:
token_file.write('a.token')
self.environ['AWS_ROLE_ARN'] = 'arn:aws:iam::123456789:role/RoleB'
self.environ['AWS_WEB_IDENTITY_TOKEN_FILE'] = token_path
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'credential_source = Environment\n'
)
self.write_config(config)
session, _ = self.create_session(profile='A')
# We should not get credentials from web-identity configured in the
# environment when the Environment credential_source is set.
# There are no Environment credentials, so this should raise a
# retrieval error.
with self.assertRaises(CredentialRetrievalError):
session.get_credentials()
2018-01-15 17:34:17 +01:00
def test_self_referential_profile(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = A\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n'
)
self.write_config(config)
expected_creds = self.create_random_credentials()
response = self.create_assume_role_response(expected_creds)
session, stubber = self.create_session(profile='A')
stubber.add_response('assume_role', response)
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
stubber.assert_no_pending_responses()
2018-11-28 09:58:03 +01:00
def create_stubbed_sts_client(self, session):
expected_creds = self.create_random_credentials()
_original_create_client = session.create_client
def create_client_sts_stub(service, *args, **kwargs):
client = _original_create_client(service, *args, **kwargs)
stub = Stubber(client)
response = self.create_assume_role_response(expected_creds)
self.actual_client_region = client.meta.region_name
stub.add_response('assume_role', response)
stub.activate()
return client
return create_client_sts_stub, expected_creds
def test_assume_role_uses_correct_region(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n\n'
'[profile B]\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n'
)
self.write_config(config)
session = Session(profile='A')
# Verify that when we configure the session with a specific region
# that we use that region when creating the sts client.
session.set_config_variable('region', 'cn-north-1')
create_client, expected_creds = self.create_stubbed_sts_client(session)
session.create_client = create_client
resolver = create_credential_resolver(session)
provider = resolver.get_provider('assume-role')
creds = provider.load()
self.assert_creds_equal(creds, expected_creds)
self.assertEqual(self.actual_client_region, 'cn-north-1')
2019-08-03 07:08:36 +02:00
class TestAssumeRoleWithWebIdentity(BaseAssumeRoleTest):
def setUp(self):
2022-05-26 00:10:07 +02:00
super().setUp()
2019-08-03 07:08:36 +02:00
self.token_file = os.path.join(self.tempdir, 'token.jwt')
self.write_token('totally.a.token')
def write_token(self, token, path=None):
if path is None:
path = self.token_file
with open(path, 'w') as f:
f.write(token)
def assert_session_credentials(self, expected_params, **kwargs):
expected_creds = self.create_random_credentials()
response = self.create_assume_role_response(expected_creds)
session = StubbedSession(**kwargs)
stubber = session.stub('sts')
stubber.add_response(
2022-05-26 00:10:07 +02:00
'assume_role_with_web_identity', response, expected_params
2019-08-03 07:08:36 +02:00
)
stubber.activate()
actual_creds = session.get_credentials()
self.assert_creds_equal(actual_creds, expected_creds)
stubber.assert_no_pending_responses()
def test_assume_role(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'role_session_name = sname\n'
'web_identity_token_file = %s\n'
) % self.token_file
self.write_config(config)
expected_params = {
'RoleArn': 'arn:aws:iam::123456789:role/RoleA',
'RoleSessionName': 'sname',
'WebIdentityToken': 'totally.a.token',
}
self.assert_session_credentials(expected_params, profile='A')
def test_assume_role_env_vars(self):
2022-05-26 00:10:07 +02:00
config = '[profile B]\n' 'region = us-west-2\n'
2019-08-03 07:08:36 +02:00
self.write_config(config)
self.environ['AWS_ROLE_ARN'] = 'arn:aws:iam::123456789:role/RoleB'
self.environ['AWS_WEB_IDENTITY_TOKEN_FILE'] = self.token_file
self.environ['AWS_ROLE_SESSION_NAME'] = 'bname'
expected_params = {
'RoleArn': 'arn:aws:iam::123456789:role/RoleB',
'RoleSessionName': 'bname',
'WebIdentityToken': 'totally.a.token',
}
self.assert_session_credentials(expected_params)
def test_assume_role_env_vars_do_not_take_precedence(self):
config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'role_session_name = aname\n'
'web_identity_token_file = %s\n'
) % self.token_file
self.write_config(config)
different_token = os.path.join(self.tempdir, str(uuid.uuid4()))
self.write_token('totally.different.token', path=different_token)
self.environ['AWS_ROLE_ARN'] = 'arn:aws:iam::123456789:role/RoleC'
self.environ['AWS_WEB_IDENTITY_TOKEN_FILE'] = different_token
self.environ['AWS_ROLE_SESSION_NAME'] = 'cname'
expected_params = {
'RoleArn': 'arn:aws:iam::123456789:role/RoleA',
'RoleSessionName': 'aname',
'WebIdentityToken': 'totally.a.token',
}
self.assert_session_credentials(expected_params, profile='A')
2018-01-15 17:34:17 +01:00
class TestProcessProvider(unittest.TestCase):
def setUp(self):
current_dir = os.path.dirname(os.path.abspath(__file__))
credential_process = os.path.join(
current_dir, 'utils', 'credentialprocess.py'
)
2022-05-26 00:10:07 +02:00
self.credential_process = '{} {}'.format(
2018-01-15 17:34:17 +01:00
sys.executable, credential_process
)
self.environ = os.environ.copy()
self.environ_patch = mock.patch('os.environ', self.environ)
self.environ_patch.start()
def tearDown(self):
self.environ_patch.stop()
def test_credential_process(self):
2022-05-26 00:10:07 +02:00
config = '[profile processcreds]\n' 'credential_process = %s\n'
2018-01-15 17:34:17 +01:00
config = config % self.credential_process
with temporary_file('w') as f:
f.write(config)
f.flush()
self.environ['AWS_CONFIG_FILE'] = f.name
credentials = Session(profile='processcreds').get_credentials()
self.assertEqual(credentials.access_key, 'spam')
self.assertEqual(credentials.secret_key, 'eggs')
def test_credential_process_returns_error(self):
config = (
'[profile processcreds]\n'
'credential_process = %s --raise-error\n'
)
config = config % self.credential_process
with temporary_file('w') as f:
f.write(config)
f.flush()
self.environ['AWS_CONFIG_FILE'] = f.name
session = Session(profile='processcreds')
# This regex validates that there is no substring: b'
# The reason why we want to validate that is that we want to
# make sure that stderr is actually decoded so that in
# exceptional cases the error is properly formatted.
# As for how the regex works:
# `(?!b').` is a negative lookahead, meaning that it will only
# match if it is not followed by the pattern `b'`. Since it is
# followed by a `.` it will match any character not followed by
# that pattern. `((?!hede).)*` does that zero or more times. The
# final pattern adds `^` and `$` to anchor the beginning and end
# of the string so we can know the whole string is consumed.
# Finally `(?s)` at the beginning makes dots match newlines so
# we can handle a multi-line string.
reg = r"(?s)^((?!b').)*$"
2021-09-22 22:53:42 +02:00
with self.assertRaisesRegex(CredentialRetrievalError, reg):
2018-01-15 17:34:17 +01:00
session.get_credentials()
2019-11-18 09:46:14 +01:00
class TestSTSRegional(BaseAssumeRoleTest):
def add_assume_role_http_response(self, stubber):
2022-05-26 00:10:07 +02:00
stubber.add_response(body=self._get_assume_role_body('AssumeRole'))
2019-11-18 09:46:14 +01:00
def add_assume_role_with_web_identity_http_response(self, stubber):
stubber.add_response(
2022-05-26 00:10:07 +02:00
body=self._get_assume_role_body('AssumeRoleWithWebIdentity')
)
2019-11-18 09:46:14 +01:00
def _get_assume_role_body(self, method_name):
expiration = self.some_future_time()
body = (
'<{method_name}Response>'
' <{method_name}Result>'
' <AssumedRoleUser>'
' <Arn>arn:aws:sts::0123456:user</Arn>'
' <AssumedRoleId>AKID:mysession-1567020004</AssumedRoleId>'
' </AssumedRoleUser>'
' <Credentials>'
' <AccessKeyId>AccessKey</AccessKeyId>'
' <SecretAccessKey>SecretKey</SecretAccessKey>'
' <SessionToken>SessionToken</SessionToken>'
' <Expiration>{expiration}</Expiration>'
' </Credentials>'
' </{method_name}Result>'
'</{method_name}Response>'
).format(method_name=method_name, expiration=expiration)
return body.encode('utf-8')
def make_stubbed_client_call_to_region(self, session, stubber, region):
ec2 = session.create_client('ec2', region_name=region)
stubber.add_response(body=b'<DescribeRegionsResponse/>')
ec2.describe_regions()
def test_assume_role_uses_same_region_as_client(self):
config = (
'[profile A]\n'
'sts_regional_endpoints = regional\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n\n'
'[profile B]\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n'
)
self.write_config(config)
session = Session(profile='A')
with SessionHTTPStubber(session) as stubber:
self.add_assume_role_http_response(stubber)
# Make an arbitrary client and API call as we are really only
# looking to make sure the STS assume role call uses the correct
# endpoint.
self.make_stubbed_client_call_to_region(
2022-05-26 00:10:07 +02:00
session, stubber, 'us-west-2'
)
2019-11-18 09:46:14 +01:00
self.assertEqual(
2022-05-26 00:10:07 +02:00
stubber.requests[0].url, 'https://sts.us-west-2.amazonaws.com/'
2019-11-18 09:46:14 +01:00
)
def test_assume_role_web_identity_uses_same_region_as_client(self):
token_file = os.path.join(self.tempdir, 'token.jwt')
with open(token_file, 'w') as f:
f.write('some-token')
config = (
'[profile A]\n'
'sts_regional_endpoints = regional\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'web_identity_token_file = %s\n'
'source_profile = B\n\n'
'[profile B]\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n' % token_file
)
self.write_config(config)
# Make an arbitrary client and API call as we are really only
# looking to make sure the STS assume role call uses the correct
# endpoint.
session = Session(profile='A')
with SessionHTTPStubber(session) as stubber:
self.add_assume_role_with_web_identity_http_response(stubber)
# Make an arbitrary client and API call as we are really only
# looking to make sure the STS assume role call uses the correct
# endpoint.
self.make_stubbed_client_call_to_region(
2022-05-26 00:10:07 +02:00
session, stubber, 'us-west-2'
)
2019-11-18 09:46:14 +01:00
self.assertEqual(
2022-05-26 00:10:07 +02:00
stubber.requests[0].url, 'https://sts.us-west-2.amazonaws.com/'
2019-11-18 09:46:14 +01:00
)
2022-12-12 17:14:19 +01:00
class MockCache:
"""Mock for JSONFileCache to avoid touching files on disk"""
def __init__(self, working_dir=None, dumps_func=None):
self.working_dir = working_dir
self.dumps_func = dumps_func
def __contains__(self, cache_key):
return True
def __getitem__(self, cache_key):
return {
"startUrl": "https://test.awsapps.com/start",
"region": "us-east-1",
"accessToken": "access-token",
"expiresAt": TIME_IN_ONE_HOUR.strftime('%Y-%m-%dT%H:%M:%SZ'),
"expiresIn": 3600,
"clientId": "client-12345",
"clientSecret": "client-secret",
"registrationExpiresAt": TIME_IN_SIX_MONTHS.strftime(
'%Y-%m-%dT%H:%M:%SZ'
),
"refreshToken": "refresh-here",
}
def __delitem__(self, cache_key):
pass
class SSOSessionTest(BaseEnvVar):
def setUp(self):
super().setUp()
self.tempdir = tempfile.mkdtemp()
self.config_file = os.path.join(self.tempdir, 'config')
self.environ['AWS_CONFIG_FILE'] = self.config_file
self.access_key_id = 'ASIA123456ABCDEFG'
self.secret_access_key = 'secret-key'
self.session_token = 'session-token'
def tearDown(self):
shutil.rmtree(self.tempdir)
super().tearDown()
def write_config(self, config):
with open(self.config_file, 'w') as f:
f.write(config)
def test_token_chosen_from_provider(self):
profile = (
'[profile sso-test]\n'
'region = us-east-1\n'
'sso_session = sso-test-session\n'
'sso_account_id = 12345678901234\n'
'sso_role_name = ViewOnlyAccess\n'
'\n'
'[sso-session sso-test-session]\n'
'sso_region = us-east-1\n'
'sso_start_url = https://test.awsapps.com/start\n'
'sso_registration_scopes = sso:account:access\n'
)
self.write_config(profile)
session = Session(profile='sso-test')
with SessionHTTPStubber(session) as stubber:
self.add_credential_response(stubber)
stubber.add_response()
with mock.patch.object(
SSOTokenProvider, 'DEFAULT_CACHE_CLS', MockCache
):
c = session.create_client('s3')
c.list_buckets()
self.assert_valid_sso_call(
stubber.requests[0],
(
'https://portal.sso.us-east-1.amazonaws.com/federation/credentials'
'?role_name=ViewOnlyAccess&account_id=12345678901234'
),
b'access-token',
)
self.assert_credentials_used(
stubber.requests[1],
self.access_key_id.encode('utf-8'),
self.session_token.encode('utf-8'),
)
def test_mismatched_session_values(self):
profile = (
'[profile sso-test]\n'
'region = us-east-1\n'
'sso_session = sso-test-session\n'
'sso_start_url = https://test2.awsapps.com/start\n'
'sso_account_id = 12345678901234\n'
'sso_role_name = ViewOnlyAccess\n'
'\n'
'[sso-session sso-test-session]\n'
'sso_region = us-east-1\n'
'sso_start_url = https://test.awsapps.com/start\n'
'sso_registration_scopes = sso:account:access\n'
)
self.write_config(profile)
session = Session(profile='sso-test')
with pytest.raises(InvalidConfigError):
c = session.create_client('s3')
c.list_buckets()
def test_missing_sso_session(self):
profile = (
'[profile sso-test]\n'
'region = us-east-1\n'
'sso_session = sso-test-session\n'
'sso_start_url = https://test2.awsapps.com/start\n'
'sso_account_id = 12345678901234\n'
'sso_role_name = ViewOnlyAccess\n'
'\n'
)
self.write_config(profile)
session = Session(profile='sso-test')
with pytest.raises(InvalidConfigError):
c = session.create_client('s3')
c.list_buckets()
def assert_valid_sso_call(self, request, url, access_token):
assert request.url == url
assert 'x-amz-sso_bearer_token' in request.headers
assert request.headers['x-amz-sso_bearer_token'] == access_token
def assert_credentials_used(self, request, access_key, session_token):
assert access_key in request.headers.get('Authorization')
assert request.headers.get('X-Amz-Security-Token') == session_token
def add_credential_response(self, stubber):
response = {
'roleCredentials': {
'accessKeyId': self.access_key_id,
'secretAccessKey': self.secret_access_key,
'sessionToken': self.session_token,
'expiration': TIME_IN_ONE_HOUR.timestamp() * 1000,
}
}
stubber.add_response(body=json.dumps(response).encode('utf-8'))