#!/usr/bin/env # Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ # Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. import base64 import datetime import json import time import botocore.auth import botocore.credentials from botocore.awsrequest import AWSRequest from botocore.compat import HTTPHeaders, parse_qs, six, urlsplit from tests import mock, unittest class BaseTestWithFixedDate(unittest.TestCase): def setUp(self): self.fixed_date = datetime.datetime(2014, 3, 10, 17, 2, 55, 0) self.datetime_patch = mock.patch('botocore.auth.datetime.datetime') self.datetime_mock = self.datetime_patch.start() self.datetime_mock.utcnow.return_value = self.fixed_date self.datetime_mock.strptime.return_value = self.fixed_date def tearDown(self): self.datetime_patch.stop() class TestHMACV1(unittest.TestCase): maxDiff = None def setUp(self): access_key = '44CF9590006BF252F707' secret_key = 'OtxrzxIsfpFjA7SwPzILwy8Bw21TLhquhboDYROV' self.credentials = botocore.credentials.Credentials( access_key, secret_key ) self.hmacv1 = botocore.auth.HmacV1Auth(self.credentials, None, None) self.date_mock = mock.patch('botocore.auth.formatdate') self.formatdate = self.date_mock.start() self.formatdate.return_value = 'Thu, 17 Nov 2005 18:49:58 GMT' def tearDown(self): self.date_mock.stop() def test_put(self): headers = { 'Date': 'Thu, 17 Nov 2005 18:49:58 GMT', 'Content-Md5': 'c8fdb181845a4ca6b8fec737b3581d76', 'Content-Type': 'text/html', 'X-Amz-Meta-Author': 'foo@bar.com', 'X-Amz-Magic': 'abracadabra', } http_headers = HTTPHeaders.from_dict(headers) split = urlsplit('/quotes/nelson') cs = self.hmacv1.canonical_string('PUT', split, http_headers) expected_canonical = ( "PUT\nc8fdb181845a4ca6b8fec737b3581d76\ntext/html\n" "Thu, 17 Nov 2005 18:49:58 GMT\nx-amz-magic:abracadabra\n" "x-amz-meta-author:foo@bar.com\n/quotes/nelson" ) expected_signature = 'jZNOcbfWmD/A/f3hSvVzXZjM2HU=' self.assertEqual(cs, expected_canonical) sig = self.hmacv1.get_signature('PUT', split, http_headers) self.assertEqual(sig, expected_signature) def test_duplicate_headers(self): pairs = [ ('Date', 'Thu, 17 Nov 2005 18:49:58 GMT'), ('Content-Md5', 'c8fdb181845a4ca6b8fec737b3581d76'), ('Content-Type', 'text/html'), ('X-Amz-Meta-Author', 'bar@baz.com'), ('X-Amz-Meta-Author', 'foo@bar.com'), ('X-Amz-Magic', 'abracadabra'), ] http_headers = HTTPHeaders.from_pairs(pairs) split = urlsplit('/quotes/nelson') sig = self.hmacv1.get_signature('PUT', split, http_headers) self.assertEqual(sig, 'kIdMxyiYB+F+83zYGR6sSb3ICcE=') def test_query_string(self): split = urlsplit('/quotes/nelson?uploads') pairs = [('Date', 'Thu, 17 Nov 2005 18:49:58 GMT')] sig = self.hmacv1.get_signature( 'PUT', split, HTTPHeaders.from_pairs(pairs) ) self.assertEqual(sig, 'P7pBz3Z4p3GxysRSJ/gR8nk7D4o=') def test_bucket_operations(self): # Check that the standard operations on buckets that are # specified as query strings end up in the canonical resource. operations = ( 'acl', 'cors', 'lifecycle', 'policy', 'notification', 'logging', 'tagging', 'requestPayment', 'versioning', 'website', 'object-lock', ) for operation in operations: url = '/quotes?%s' % operation split = urlsplit(url) cr = self.hmacv1.canonical_resource(split) self.assertEqual(cr, '/quotes?%s' % operation) def test_sign_with_token(self): credentials = botocore.credentials.Credentials( access_key='foo', secret_key='bar', token='baz' ) auth = botocore.auth.HmacV1Auth(credentials) request = AWSRequest() request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT' request.headers['Content-Type'] = 'text/html' request.method = 'PUT' request.url = 'https://s3.amazonaws.com/bucket/key' auth.add_auth(request) self.assertIn('Authorization', request.headers) # We're not actually checking the signature here, we're # just making sure the auth header has the right format. self.assertTrue(request.headers['Authorization'].startswith('AWS ')) def test_resign_with_token(self): credentials = botocore.credentials.Credentials( access_key='foo', secret_key='bar', token='baz' ) auth = botocore.auth.HmacV1Auth(credentials) request = AWSRequest() request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT' request.headers['Content-Type'] = 'text/html' request.method = 'PUT' request.url = 'https://s3.amazonaws.com/bucket/key' auth.add_auth(request) original_auth = request.headers['Authorization'] # Resigning the request shouldn't change the authorization # header. We are also ensuring that the date stays the same # because we're mocking out the formatdate() call. There's # another unit test that verifies we use the latest time # when we sign the request. auth.add_auth(request) self.assertEqual( request.headers.get_all('Authorization'), [original_auth] ) def test_resign_uses_most_recent_date(self): dates = [ 'Thu, 17 Nov 2005 18:49:58 GMT', 'Thu, 17 Nov 2014 20:00:00 GMT', ] self.formatdate.side_effect = dates request = AWSRequest() request.headers['Content-Type'] = 'text/html' request.method = 'PUT' request.url = 'https://s3.amazonaws.com/bucket/key' self.hmacv1.add_auth(request) original_date = request.headers['Date'] self.hmacv1.add_auth(request) modified_date = request.headers['Date'] # Each time we sign a request, we make another call to formatdate() # so we should have a different date header each time. self.assertEqual(original_date, dates[0]) self.assertEqual(modified_date, dates[1]) class TestSigV2(unittest.TestCase): maxDiff = None def setUp(self): access_key = 'foo' secret_key = 'bar' self.credentials = botocore.credentials.Credentials( access_key, secret_key ) self.signer = botocore.auth.SigV2Auth(self.credentials) self.time_patcher = mock.patch.object( botocore.auth.time, 'gmtime', mock.Mock(wraps=time.gmtime) ) mocked_time = self.time_patcher.start() mocked_time.return_value = time.struct_time( [2014, 6, 20, 8, 40, 23, 4, 171, 0] ) def tearDown(self): self.time_patcher.stop() def test_put(self): request = mock.Mock() request.url = '/' request.method = 'POST' params = {'Foo': '\u2713'} result = self.signer.calc_signature(request, params) self.assertEqual( result, ('Foo=%E2%9C%93', 'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q='), ) def test_fields(self): request = AWSRequest() request.url = '/' request.method = 'POST' request.data = {'Foo': '\u2713'} self.signer.add_auth(request) self.assertEqual(request.data['AWSAccessKeyId'], 'foo') self.assertEqual(request.data['Foo'], '\u2713') self.assertEqual(request.data['Timestamp'], '2014-06-20T08:40:23Z') self.assertEqual( request.data['Signature'], 'Tiecw+t51tok4dTT8B4bg47zxHEM/KcD55f2/x6K22o=', ) self.assertEqual(request.data['SignatureMethod'], 'HmacSHA256') self.assertEqual(request.data['SignatureVersion'], '2') def test_resign(self): # Make sure that resigning after e.g. retries works request = AWSRequest() request.url = '/' request.method = 'POST' params = { 'Foo': '\u2713', 'Signature': 'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q=', } result = self.signer.calc_signature(request, params) self.assertEqual( result, ('Foo=%E2%9C%93', 'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q='), ) def test_get(self): request = AWSRequest() request.url = '/' request.method = 'GET' request.params = {'Foo': '\u2713'} self.signer.add_auth(request) self.assertEqual(request.params['AWSAccessKeyId'], 'foo') self.assertEqual(request.params['Foo'], '\u2713') self.assertEqual(request.params['Timestamp'], '2014-06-20T08:40:23Z') self.assertEqual( request.params['Signature'], 'Un97klqZCONP65bA1+Iv4H3AcB2I40I4DBvw5ZERFPw=', ) self.assertEqual(request.params['SignatureMethod'], 'HmacSHA256') self.assertEqual(request.params['SignatureVersion'], '2') class TestSigV3(unittest.TestCase): maxDiff = None def setUp(self): self.access_key = 'access_key' self.secret_key = 'secret_key' self.credentials = botocore.credentials.Credentials( self.access_key, self.secret_key ) self.auth = botocore.auth.SigV3Auth(self.credentials) self.date_mock = mock.patch('botocore.auth.formatdate') self.formatdate = self.date_mock.start() self.formatdate.return_value = 'Thu, 17 Nov 2005 18:49:58 GMT' def tearDown(self): self.date_mock.stop() def test_signature_with_date_headers(self): request = AWSRequest() request.headers = {'Date': 'Thu, 17 Nov 2005 18:49:58 GMT'} request.url = 'https://route53.amazonaws.com' self.auth.add_auth(request) self.assertEqual( request.headers['X-Amzn-Authorization'], ( 'AWS3-HTTPS AWSAccessKeyId=access_key,Algorithm=HmacSHA256,' 'Signature=M245fo86nVKI8rLpH4HgWs841sBTUKuwciiTpjMDgPs=' ), ) def test_resign_with_token(self): credentials = botocore.credentials.Credentials( access_key='foo', secret_key='bar', token='baz' ) auth = botocore.auth.SigV3Auth(credentials) request = AWSRequest() request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT' request.method = 'PUT' request.url = 'https://route53.amazonaws.com/' auth.add_auth(request) original_auth = request.headers['X-Amzn-Authorization'] # Resigning the request shouldn't change the authorization # header. auth.add_auth(request) self.assertEqual( request.headers.get_all('X-Amzn-Authorization'), [original_auth] ) class TestS3SigV4Auth(BaseTestWithFixedDate): AuthClass = botocore.auth.S3SigV4Auth maxDiff = None def setUp(self): super().setUp() self.credentials = botocore.credentials.Credentials( access_key='foo', secret_key='bar', token='baz' ) self.auth = self.AuthClass(self.credentials, 'ec2', 'eu-central-1') self.request = AWSRequest(data=six.BytesIO(b"foo bar baz")) self.request.method = 'PUT' self.request.url = 'https://s3.eu-central-1.amazonaws.com/' self.client_config = mock.Mock() self.s3_config = {} self.client_config.s3 = self.s3_config self.request.context = {'client_config': self.client_config} def test_resign_with_content_hash(self): self.auth.add_auth(self.request) original_auth = self.request.headers['Authorization'] self.auth.add_auth(self.request) self.assertEqual( self.request.headers.get_all('Authorization'), [original_auth] ) def test_signature_is_not_normalized(self): request = AWSRequest() request.url = 'https://s3.amazonaws.com/bucket/foo/./bar/../bar' request.method = 'GET' credentials = botocore.credentials.Credentials( 'access_key', 'secret_key' ) auth = self.AuthClass(credentials, 's3', 'us-east-1') auth.add_auth(request) self.assertTrue( request.headers['Authorization'].startswith('AWS4-HMAC-SHA256') ) def test_query_string_params_in_urls(self): if not hasattr(self.AuthClass, 'canonical_query_string'): raise unittest.SkipTest( '%s does not expose interim steps' % self.AuthClass.__name__ ) request = AWSRequest() request.url = ( 'https://s3.amazonaws.com/bucket?' 'marker=%C3%A4%C3%B6%C3%BC-01.txt&prefix' ) request.data = {'Action': 'MyOperation'} request.method = 'GET' # Check that the canonical query string is correct formatting # by ensuring that query string paramters that are added to the # canonical query string are correctly formatted. cqs = self.auth.canonical_query_string(request) self.assertEqual('marker=%C3%A4%C3%B6%C3%BC-01.txt&prefix=', cqs) def _test_blacklist_header(self, header, value): request = AWSRequest() request.url = 'https://s3.amazonaws.com/bucket/foo' request.method = 'PUT' request.headers[header] = value credentials = botocore.credentials.Credentials( 'access_key', 'secret_key' ) auth = self.AuthClass(credentials, 's3', 'us-east-1') auth.add_auth(request) self.assertNotIn(header, request.headers['Authorization']) def test_blacklist_expect_headers(self): self._test_blacklist_header('expect', '100-continue') def test_blacklist_trace_id(self): self._test_blacklist_header( 'x-amzn-trace-id', 'Root=foo;Parent=bar;Sampleid=1' ) def test_blacklist_headers(self): self._test_blacklist_header('user-agent', 'botocore/1.4.11') def test_uses_sha256_if_config_value_is_true(self): self.client_config.s3['payload_signing_enabled'] = True self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_does_not_use_sha256_if_config_value_is_false(self): self.client_config.s3['payload_signing_enabled'] = False self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_uses_sha256_if_md5_unset(self): self.request.context['has_streaming_input'] = True self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_uses_sha256_if_not_https(self): self.request.context['has_streaming_input'] = True self.request.headers.add_header('Content-MD5', 'foo') self.request.url = 'http://s3.amazonaws.com/bucket' self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_uses_sha256_if_not_streaming_upload(self): self.request.context['has_streaming_input'] = False self.request.headers.add_header('Content-MD5', 'foo') self.request.url = 'https://s3.amazonaws.com/bucket' self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_does_not_use_sha256_if_md5_set(self): self.request.context['has_streaming_input'] = True self.request.headers.add_header('Content-MD5', 'foo') self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_does_not_use_sha256_if_checksum_set(self): self.request.context['has_streaming_input'] = True self.request.context['checksum'] = { 'request_algorithm': { 'in': 'header', 'name': 'x-amz-checksum-sha256', 'algorithm': 'sha256', } } self.request.headers.add_header('X-Amz-Checksum-sha256', 'foo') self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_does_not_use_sha256_if_context_config_set(self): self.request.context['payload_signing_enabled'] = False self.request.headers.add_header('Content-MD5', 'foo') self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_sha256_if_context_set_on_http(self): self.request.context['payload_signing_enabled'] = False self.request.headers.add_header('Content-MD5', 'foo') self.request.url = 'http://s3.amazonaws.com/bucket' self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_sha256_if_context_set_without_md5(self): self.request.context['payload_signing_enabled'] = False self.request.url = 'https://s3.amazonaws.com/bucket' self.auth.add_auth(self.request) sha_header = self.request.headers['X-Amz-Content-SHA256'] self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') class TestSigV4(unittest.TestCase): def setUp(self): self.credentials = botocore.credentials.Credentials( access_key='foo', secret_key='bar' ) def create_signer(self, service_name='myservice', region='us-west-2'): auth = botocore.auth.SigV4Auth(self.credentials, service_name, region) return auth def test_canonical_query_string(self): request = AWSRequest() request.url = ( 'https://search-testdomain1-j67dwxlet67gf7ghwfmik2c67i.us-west-2.' 'cloudsearch.amazonaws.com/' '2013-01-01/search?format=sdk&pretty=true&' 'q.options=%7B%22defaultOperator%22%3A%20%22and%22%2C%20%22' 'fields%22%3A%5B%22directors%5E10%22%5D%7D&q=George%20Lucas' ) request.method = 'GET' auth = self.create_signer('cloudsearchdomain', 'us-west-2') actual = auth.canonical_query_string(request) # Here 'q' should come before 'q.options'. expected = ( "format=sdk&pretty=true&q=George%20Lucas&q.options=%7B%22" "defaultOperator%22%3A%20%22and%22%2C%20%22fields%22%3A%5B" "%22directors%5E10%22%5D%7D" ) self.assertEqual(actual, expected) def test_thread_safe_timestamp(self): request = AWSRequest() request.url = ( 'https://search-testdomain1-j67dwxlet67gf7ghwfmik2c67i.us-west-2.' 'cloudsearch.amazonaws.com/' '2013-01-01/search?format=sdk&pretty=true&' 'q.options=%7B%22defaultOperator%22%3A%20%22and%22%2C%20%22' 'fields%22%3A%5B%22directors%5E10%22%5D%7D&q=George%20Lucas' ) request.method = 'GET' auth = self.create_signer('cloudsearchdomain', 'us-west-2') with mock.patch.object( botocore.auth.datetime, 'datetime', mock.Mock(wraps=datetime.datetime), ) as mock_datetime: original_utcnow = datetime.datetime(2014, 1, 1, 0, 0) mock_datetime.utcnow.return_value = original_utcnow # Go through the add_auth process once. This will attach # a timestamp to the request at the beginning of auth. auth.add_auth(request) self.assertEqual(request.context['timestamp'], '20140101T000000Z') # Ensure the date is in the Authorization header self.assertIn('20140101', request.headers['Authorization']) # Now suppose the utc time becomes the next day all of a sudden mock_datetime.utcnow.return_value = datetime.datetime( 2014, 1, 2, 0, 0 ) # Smaller methods like the canonical request and string_to_sign # should have the timestamp attached to the request in their # body and not what the time is now mocked as. This is to ensure # there is no mismatching in timestamps when signing. cr = auth.canonical_request(request) self.assertIn('x-amz-date:20140101T000000Z', cr) self.assertNotIn('x-amz-date:20140102T000000Z', cr) sts = auth.string_to_sign(request, cr) self.assertIn('20140101T000000Z', sts) self.assertNotIn('20140102T000000Z', sts) def test_payload_is_binary_file(self): request = AWSRequest() request.data = six.BytesIO('\u2713'.encode()) request.url = 'https://amazonaws.com' auth = self.create_signer() payload = auth.payload(request) self.assertEqual( payload, '1dabba21cdad44541f6b15796f8d22978fc7ea10c46aeceeeeb66c23b3ac7604', ) def test_payload_is_bytes_type(self): request = AWSRequest() request.data = '\u2713'.encode() request.url = 'https://amazonaws.com' auth = self.create_signer() payload = auth.payload(request) self.assertEqual( payload, '1dabba21cdad44541f6b15796f8d22978fc7ea10c46aeceeeeb66c23b3ac7604', ) def test_payload_not_signed_if_disabled_in_context(self): request = AWSRequest() request.data = '\u2713'.encode() request.url = 'https://amazonaws.com' request.context['payload_signing_enabled'] = False auth = self.create_signer() payload = auth.payload(request) self.assertEqual(payload, 'UNSIGNED-PAYLOAD') def test_content_sha256_set_if_payload_signing_disabled(self): request = AWSRequest() request.data = six.BytesIO('\u2713'.encode()) request.url = 'https://amazonaws.com' request.context['payload_signing_enabled'] = False request.method = 'PUT' auth = self.create_signer() auth.add_auth(request) sha_header = request.headers['X-Amz-Content-SHA256'] self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') def test_collapse_multiple_spaces(self): auth = self.create_signer() original = HTTPHeaders() original['foo'] = 'double space' headers = auth.canonical_headers(original) self.assertEqual(headers, 'foo:double space') def test_trims_leading_trailing_spaces(self): auth = self.create_signer() original = HTTPHeaders() original['foo'] = ' leading and trailing ' headers = auth.canonical_headers(original) self.assertEqual(headers, 'foo:leading and trailing') def test_strips_http_default_port(self): request = AWSRequest() request.url = 'http://s3.us-west-2.amazonaws.com:80/' request.method = 'GET' auth = self.create_signer('s3', 'us-west-2') actual = auth.headers_to_sign(request)['host'] expected = 's3.us-west-2.amazonaws.com' self.assertEqual(actual, expected) def test_strips_https_default_port(self): request = AWSRequest() request.url = 'https://s3.us-west-2.amazonaws.com:443/' request.method = 'GET' auth = self.create_signer('s3', 'us-west-2') actual = auth.headers_to_sign(request)['host'] expected = 's3.us-west-2.amazonaws.com' self.assertEqual(actual, expected) def test_strips_http_auth(self): request = AWSRequest() request.url = 'https://username:password@s3.us-west-2.amazonaws.com/' request.method = 'GET' auth = self.create_signer('s3', 'us-west-2') actual = auth.headers_to_sign(request)['host'] expected = 's3.us-west-2.amazonaws.com' self.assertEqual(actual, expected) def test_strips_default_port_and_http_auth(self): request = AWSRequest() request.url = 'http://username:password@s3.us-west-2.amazonaws.com:80/' request.method = 'GET' auth = self.create_signer('s3', 'us-west-2') actual = auth.headers_to_sign(request)['host'] expected = 's3.us-west-2.amazonaws.com' self.assertEqual(actual, expected) class TestSigV4Resign(BaseTestWithFixedDate): maxDiff = None AuthClass = botocore.auth.SigV4Auth def setUp(self): super().setUp() self.credentials = botocore.credentials.Credentials( access_key='foo', secret_key='bar', token='baz' ) self.auth = self.AuthClass(self.credentials, 'ec2', 'us-west-2') self.request = AWSRequest() self.request.method = 'PUT' self.request.url = 'https://ec2.amazonaws.com/' def test_resign_request_with_date(self): self.request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT' self.auth.add_auth(self.request) original_auth = self.request.headers['Authorization'] self.auth.add_auth(self.request) self.assertEqual( self.request.headers.get_all('Authorization'), [original_auth] ) def test_sigv4_without_date(self): self.auth.add_auth(self.request) original_auth = self.request.headers['Authorization'] self.auth.add_auth(self.request) self.assertEqual( self.request.headers.get_all('Authorization'), [original_auth] ) class BasePresignTest(unittest.TestCase): def get_parsed_query_string(self, request): query_string_dict = parse_qs(urlsplit(request.url).query) # Also, parse_qs sets each value in the dict to be a list, but # because we know that we won't have repeated keys, we simplify # the dict and convert it back to a single value. for key in query_string_dict: query_string_dict[key] = query_string_dict[key][0] return query_string_dict class TestS3SigV2Presign(BasePresignTest): def setUp(self): self.access_key = 'access_key' self.secret_key = 'secret_key' self.credentials = botocore.credentials.Credentials( self.access_key, self.secret_key ) self.expires = 3000 self.auth = botocore.auth.HmacV1QueryAuth( self.credentials, expires=self.expires ) self.current_epoch_time = 1427427247.465591 self.time_patch = mock.patch('time.time') self.time_mock = self.time_patch.start() self.time_mock.return_value = self.current_epoch_time self.request = AWSRequest() self.bucket = 'mybucket' self.key = 'myobject' self.path = 'https://s3.amazonaws.com/{}/{}'.format( self.bucket, self.key ) self.request.url = self.path self.request.method = 'GET' def tearDown(self): self.time_patch.stop() def test_presign_with_query_string(self): self.request.url = ( 'https://foo-bucket.s3.amazonaws.com/image.jpg' '?response-content-disposition=' 'attachment%3B%20filename%3D%22download.jpg%22' ) self.auth.add_auth(self.request) query_string = self.get_parsed_query_string(self.request) # We should have still kept the response-content-disposition # in the query string. self.assertIn('response-content-disposition', query_string) self.assertEqual( query_string['response-content-disposition'], 'attachment; filename="download.jpg"', ) # But we should have also added the parts from the signer. self.assertEqual(query_string['AWSAccessKeyId'], self.access_key) def test_presign_no_headers(self): self.auth.add_auth(self.request) self.assertTrue(self.request.url.startswith(self.path + '?')) query_string = self.get_parsed_query_string(self.request) self.assertEqual(query_string['AWSAccessKeyId'], self.access_key) self.assertEqual( query_string['Expires'], str(int(self.current_epoch_time) + self.expires), ) self.assertEqual( query_string['Signature'], 'ZRSgywstwIruKLTLt/Bcrf9H1K4=' ) def test_presign_with_x_amz_headers(self): self.request.headers['x-amz-security-token'] = 'foo' self.request.headers['x-amz-acl'] = 'read-only' self.auth.add_auth(self.request) query_string = self.get_parsed_query_string(self.request) self.assertEqual(query_string['x-amz-security-token'], 'foo') self.assertEqual(query_string['x-amz-acl'], 'read-only') self.assertEqual( query_string['Signature'], '5oyMAGiUk1E5Ry2BnFr6cIS3Gus=' ) def test_presign_with_content_headers(self): self.request.headers['content-type'] = 'txt' self.request.headers['content-md5'] = 'foo' self.auth.add_auth(self.request) query_string = self.get_parsed_query_string(self.request) self.assertEqual(query_string['content-type'], 'txt') self.assertEqual(query_string['content-md5'], 'foo') self.assertEqual( query_string['Signature'], '/YQRFdQGywXP74WrOx2ET/RUqz8=' ) def test_presign_with_unused_headers(self): self.request.headers['user-agent'] = 'botocore' self.auth.add_auth(self.request) query_string = self.get_parsed_query_string(self.request) self.assertNotIn('user-agent', query_string) self.assertEqual( query_string['Signature'], 'ZRSgywstwIruKLTLt/Bcrf9H1K4=' ) class TestSigV4Presign(BasePresignTest): maxDiff = None AuthClass = botocore.auth.SigV4QueryAuth def setUp(self): self.access_key = 'access_key' self.secret_key = 'secret_key' self.credentials = botocore.credentials.Credentials( self.access_key, self.secret_key ) self.service_name = 'myservice' self.region_name = 'myregion' self.auth = self.AuthClass( self.credentials, self.service_name, self.region_name, expires=60 ) self.datetime_patcher = mock.patch.object( botocore.auth.datetime, 'datetime', mock.Mock(wraps=datetime.datetime), ) mocked_datetime = self.datetime_patcher.start() mocked_datetime.utcnow.return_value = datetime.datetime( 2014, 1, 1, 0, 0 ) def tearDown(self): self.datetime_patcher.stop() def test_presign_no_params(self): request = AWSRequest() request.method = 'GET' request.url = 'https://ec2.us-east-1.amazonaws.com/' self.auth.add_auth(request) query_string = self.get_parsed_query_string(request) self.assertEqual( query_string, { 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 'X-Amz-Credential': ( 'access_key/20140101/myregion/' 'myservice/aws4_request' ), 'X-Amz-Date': '20140101T000000Z', 'X-Amz-Expires': '60', 'X-Amz-Signature': ( 'c70e0bcdb4cd3ee324f71c78195445b878' '8315af0800bbbdbbb6d05a616fb84c' ), 'X-Amz-SignedHeaders': 'host', }, ) def test_operation_params_before_auth_params(self): # The spec is picky about this. request = AWSRequest() request.method = 'GET' request.url = 'https://ec2.us-east-1.amazonaws.com/?Action=MyOperation' self.auth.add_auth(request) # Verify auth params come after the existing params. self.assertIn('?Action=MyOperation&X-Amz', request.url) def test_operation_params_before_auth_params_in_body(self): request = AWSRequest() request.method = 'GET' request.url = 'https://ec2.us-east-1.amazonaws.com/' request.data = {'Action': 'MyOperation'} self.auth.add_auth(request) # Same situation, the params from request.data come before the auth # params in the query string. self.assertIn('?Action=MyOperation&X-Amz', request.url) def test_operation_params_before_auth_params_in_params(self): request = AWSRequest() request.method = 'GET' request.url = 'https://ec2.us-east-1.amazonaws.com/' request.params = {'Action': 'MyOperation'} self.auth.add_auth(request) # Same situation, the params from request.param come before the # auth params in the query string. self.assertIn('?Action=MyOperation&X-Amz', request.url) def test_request_params_not_duplicated_in_prepare(self): """ params should be moved to query string in add_auth and not rewritten at the end with request.prepare() """ request = AWSRequest( method='GET', url='https://ec2.us-east-1.amazonaws.com/', params={'Action': 'MyOperation'}, ) self.auth.add_auth(request) self.assertIn('?Action=MyOperation&X-Amz', request.url) prep = request.prepare() assert not prep.url.endswith('Action=MyOperation') def test_presign_with_spaces_in_param(self): request = AWSRequest() request.method = 'GET' request.url = 'https://ec2.us-east-1.amazonaws.com/' request.data = {'Action': 'MyOperation', 'Description': 'With Spaces'} self.auth.add_auth(request) # Verify we encode spaces as '%20, and we don't use '+'. self.assertIn('Description=With%20Spaces', request.url) def test_presign_with_empty_param_value(self): request = AWSRequest() request.method = 'POST' # actual URL format for creating a multipart upload request.url = 'https://s3.amazonaws.com/mybucket/mykey?uploads' self.auth.add_auth(request) # verify that uploads param is still in URL self.assertIn('uploads', request.url) def test_s3_sigv4_presign(self): auth = botocore.auth.S3SigV4QueryAuth( self.credentials, self.service_name, self.region_name, expires=60 ) request = AWSRequest() request.method = 'GET' request.url = ( 'https://s3.us-west-2.amazonaws.com/mybucket/keyname/.bar' ) auth.add_auth(request) query_string = self.get_parsed_query_string(request) # We use a different payload: self.assertEqual(auth.payload(request), 'UNSIGNED-PAYLOAD') # which will result in a different X-Amz-Signature: self.assertEqual( query_string, { 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 'X-Amz-Credential': ( 'access_key/20140101/myregion/' 'myservice/aws4_request' ), 'X-Amz-Date': '20140101T000000Z', 'X-Amz-Expires': '60', 'X-Amz-Signature': ( 'ac1b8b9e47e8685c5c963d75e35e8741d55251' 'cd955239cc1efad4dc7201db66' ), 'X-Amz-SignedHeaders': 'host', }, ) def test_presign_with_security_token(self): self.credentials.token = 'security-token' auth = botocore.auth.S3SigV4QueryAuth( self.credentials, self.service_name, self.region_name, expires=60 ) request = AWSRequest() request.method = 'GET' request.url = 'https://ec2.us-east-1.amazonaws.com/' auth.add_auth(request) query_string = self.get_parsed_query_string(request) self.assertEqual( query_string['X-Amz-Security-Token'], 'security-token' ) def test_presign_where_body_is_json_bytes(self): request = AWSRequest() request.method = 'GET' request.url = 'https://myservice.us-east-1.amazonaws.com/' request.data = b'{"Param": "value"}' self.auth.add_auth(request) query_string = self.get_parsed_query_string(request) expected_query_string = { 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 'X-Amz-Credential': ( 'access_key/20140101/myregion/myservice/aws4_request' ), 'X-Amz-Expires': '60', 'X-Amz-Date': '20140101T000000Z', 'X-Amz-Signature': ( '8e1d372d168d532313ce6df8f64a7dc51d' 'e6f312a9cfba6e5b345d8a771e839c' ), 'X-Amz-SignedHeaders': 'host', 'Param': 'value', } self.assertEqual(query_string, expected_query_string) def test_presign_where_body_is_json_string(self): request = AWSRequest() request.method = 'GET' request.url = 'https://myservice.us-east-1.amazonaws.com/' request.data = '{"Param": "value"}' self.auth.add_auth(request) query_string = self.get_parsed_query_string(request) expected_query_string = { 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 'X-Amz-Credential': ( 'access_key/20140101/myregion/myservice/aws4_request' ), 'X-Amz-Expires': '60', 'X-Amz-Date': '20140101T000000Z', 'X-Amz-Signature': ( '8e1d372d168d532313ce6df8f64a7dc51d' 'e6f312a9cfba6e5b345d8a771e839c' ), 'X-Amz-SignedHeaders': 'host', 'Param': 'value', } self.assertEqual(query_string, expected_query_string) def test_presign_content_type_form_encoded_not_signed(self): request = AWSRequest() request.method = 'GET' request.url = 'https://myservice.us-east-1.amazonaws.com/' request.headers[ 'Content-Type' ] = 'application/x-www-form-urlencoded; charset=utf-8' self.auth.add_auth(request) query_string = self.get_parsed_query_string(request) signed_headers = query_string.get('X-Amz-SignedHeaders') self.assertNotIn('content-type', signed_headers) class BaseS3PresignPostTest(unittest.TestCase): def setUp(self): self.access_key = 'access_key' self.secret_key = 'secret_key' self.credentials = botocore.credentials.Credentials( self.access_key, self.secret_key ) self.service_name = 'myservice' self.region_name = 'myregion' self.bucket = 'mybucket' self.key = 'mykey' self.policy = { "expiration": "2007-12-01T12:00:00.000Z", "conditions": [ {"acl": "public-read"}, {"bucket": self.bucket}, ["starts-with", "$key", self.key], ], } self.fields = { 'key': self.key, 'acl': 'public-read', } self.request = AWSRequest() self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket self.request.method = 'POST' self.request.context['s3-presign-post-fields'] = self.fields self.request.context['s3-presign-post-policy'] = self.policy class TestS3SigV2Post(BaseS3PresignPostTest): def setUp(self): super().setUp() self.auth = botocore.auth.HmacV1PostAuth(self.credentials) self.current_epoch_time = 1427427247.465591 self.time_patch = mock.patch('time.time') self.time_mock = self.time_patch.start() self.time_mock.return_value = self.current_epoch_time def tearDown(self): self.time_patch.stop() def test_presign_post(self): self.auth.add_auth(self.request) result_fields = self.request.context['s3-presign-post-fields'] self.assertEqual( result_fields['AWSAccessKeyId'], self.credentials.access_key ) result_policy = json.loads( base64.b64decode(result_fields['policy']).decode('utf-8') ) self.assertEqual( result_policy['expiration'], '2007-12-01T12:00:00.000Z' ) self.assertEqual( result_policy['conditions'], [ {"acl": "public-read"}, {"bucket": "mybucket"}, ["starts-with", "$key", "mykey"], ], ) self.assertIn('signature', result_fields) def test_presign_post_with_security_token(self): self.credentials.token = 'my-token' self.auth = botocore.auth.HmacV1PostAuth(self.credentials) self.auth.add_auth(self.request) result_fields = self.request.context['s3-presign-post-fields'] self.assertEqual(result_fields['x-amz-security-token'], 'my-token') def test_empty_fields_and_policy(self): self.request = AWSRequest() self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket self.request.method = 'POST' self.auth.add_auth(self.request) result_fields = self.request.context['s3-presign-post-fields'] self.assertEqual( result_fields['AWSAccessKeyId'], self.credentials.access_key ) result_policy = json.loads( base64.b64decode(result_fields['policy']).decode('utf-8') ) self.assertEqual(result_policy['conditions'], []) self.assertIn('signature', result_fields) class TestS3SigV4Post(BaseS3PresignPostTest): def setUp(self): super().setUp() self.auth = botocore.auth.S3SigV4PostAuth( self.credentials, self.service_name, self.region_name ) self.datetime_patcher = mock.patch.object( botocore.auth.datetime, 'datetime', mock.Mock(wraps=datetime.datetime), ) mocked_datetime = self.datetime_patcher.start() mocked_datetime.utcnow.return_value = datetime.datetime( 2014, 1, 1, 0, 0 ) def tearDown(self): self.datetime_patcher.stop() def test_presign_post(self): self.auth.add_auth(self.request) result_fields = self.request.context['s3-presign-post-fields'] self.assertEqual(result_fields['x-amz-algorithm'], 'AWS4-HMAC-SHA256') self.assertEqual( result_fields['x-amz-credential'], 'access_key/20140101/myregion/myservice/aws4_request', ) self.assertEqual(result_fields['x-amz-date'], '20140101T000000Z') result_policy = json.loads( base64.b64decode(result_fields['policy']).decode('utf-8') ) self.assertEqual( result_policy['expiration'], '2007-12-01T12:00:00.000Z' ) self.assertEqual( result_policy['conditions'], [ {"acl": "public-read"}, {"bucket": "mybucket"}, ["starts-with", "$key", "mykey"], {"x-amz-algorithm": "AWS4-HMAC-SHA256"}, { "x-amz-credential": "access_key/20140101/myregion/myservice/aws4_request" }, {"x-amz-date": "20140101T000000Z"}, ], ) self.assertIn('x-amz-signature', result_fields) def test_presign_post_with_security_token(self): self.credentials.token = 'my-token' self.auth = botocore.auth.S3SigV4PostAuth( self.credentials, self.service_name, self.region_name ) self.auth.add_auth(self.request) result_fields = self.request.context['s3-presign-post-fields'] self.assertEqual(result_fields['x-amz-security-token'], 'my-token') def test_empty_fields_and_policy(self): self.request = AWSRequest() self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket self.request.method = 'POST' self.auth.add_auth(self.request) result_fields = self.request.context['s3-presign-post-fields'] self.assertEqual(result_fields['x-amz-algorithm'], 'AWS4-HMAC-SHA256') self.assertEqual( result_fields['x-amz-credential'], 'access_key/20140101/myregion/myservice/aws4_request', ) self.assertEqual(result_fields['x-amz-date'], '20140101T000000Z') result_policy = json.loads( base64.b64decode(result_fields['policy']).decode('utf-8') ) self.assertEqual( result_policy['conditions'], [ {"x-amz-algorithm": "AWS4-HMAC-SHA256"}, { "x-amz-credential": "access_key/20140101/myregion/myservice/aws4_request" }, {"x-amz-date": "20140101T000000Z"}, ], ) self.assertIn('x-amz-signature', result_fields)