python-botocore/tests/unit/auth/test_sigv4.py

212 lines
7.5 KiB
Python
Raw Normal View History

# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Signature Version 4 test suite.
AWS provides a test suite for signature version 4:
2021-08-18 17:45:16 +02:00
https://github.com/awslabs/aws-c-auth/tree/v0.3.15/tests/aws-sig-v4-test-suite
This module contains logic to run these tests. The test files were
2021-10-04 18:33:37 +02:00
placed in ./aws4_testsuite, and we're using those to dynamically
generate testcases based on these files.
"""
import os
import logging
import io
import datetime
2021-08-18 17:45:16 +02:00
import re
2021-10-04 18:33:37 +02:00
import pytest
2021-09-22 22:53:42 +02:00
from tests import mock
import botocore.auth
from botocore.awsrequest import AWSRequest
2021-09-22 22:53:42 +02:00
from botocore.compat import six, urlsplit, parse_qsl, HAS_CRT
from botocore.credentials import Credentials
SECRET_KEY = "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY"
ACCESS_KEY = 'AKIDEXAMPLE'
2021-08-18 17:45:16 +02:00
DATE = datetime.datetime(2015, 8, 30, 12, 36, 0)
SERVICE = 'service'
REGION = 'us-east-1'
TESTSUITE_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'aws4_testsuite')
# The following tests are not run. Each test has a comment as
# to why the test is being ignored.
TESTS_TO_IGNORE = [
2021-08-18 17:45:16 +02:00
# Bad request-line syntax, python's HTTP parser chokes on this.
'normalize-path/get-space',
# Multiple query params of the same key not supported by the SDKs.
'get-vanilla-query-order-key-case',
2021-08-18 17:45:16 +02:00
'get-vanilla-query-order-key',
'get-vanilla-query-order-value',
]
if not six.PY3:
TESTS_TO_IGNORE += [
# NO support
'get-header-key-duplicate',
'get-header-value-order',
]
log = logging.getLogger(__name__)
2017-06-27 11:52:19 +02:00
class RawHTTPRequest(six.moves.BaseHTTPServer.BaseHTTPRequestHandler):
def __init__(self, raw_request):
if isinstance(raw_request, six.text_type):
raw_request = raw_request.encode('utf-8')
2017-06-27 11:52:19 +02:00
self.rfile = six.BytesIO(raw_request)
self.raw_requestline = self.rfile.readline()
self.error_code = None
self.error_message = None
self.parse_request()
def send_error(self, code, message):
self.error_code = code
self.error_message = message
2021-10-04 18:33:37 +02:00
def generate_test_cases():
2021-08-18 17:45:16 +02:00
for (dirpath, dirnames, filenames) in os.walk(TESTSUITE_DIR):
if not any(f.endswith('.req') for f in filenames):
continue
test_case = os.path.relpath(dirpath, TESTSUITE_DIR)
if test_case in TESTS_TO_IGNORE:
log.debug("Skipping test: %s", test_case)
continue
2021-08-18 17:45:16 +02:00
2021-10-04 18:33:37 +02:00
yield test_case
@pytest.mark.parametrize("test_case", generate_test_cases())
def test_signature_version_4(test_case):
datetime_patcher = mock.patch.object(
botocore.auth.datetime, 'datetime',
mock.Mock(wraps=datetime.datetime)
)
mocked_datetime = datetime_patcher.start()
mocked_datetime.utcnow.return_value = DATE
_test_signature_version_4(test_case)
datetime_patcher.stop()
def create_request_from_raw_request(raw_request):
request = AWSRequest()
raw = RawHTTPRequest(raw_request)
if raw.error_code is not None:
raise Exception(raw.error_message)
request.method = raw.command
2021-08-18 17:45:16 +02:00
datetime_now = DATE
request.context['timestamp'] = datetime_now.strftime('%Y%m%dT%H%M%SZ')
for key, val in raw.headers.items():
request.headers[key] = val
request.data = raw.rfile.read()
host = raw.headers.get('host', '')
# For whatever reason, the BaseHTTPRequestHandler encodes
# the first line of the response as 'iso-8859-1',
# so we need decode this into utf-8.
if isinstance(raw.path, six.text_type):
raw.path = raw.path.encode('iso-8859-1').decode('utf-8')
url = 'https://%s%s' % (host, raw.path)
if '?' in url:
split_url = urlsplit(url)
params = dict(parse_qsl(split_url.query))
request.url = split_url.path
request.params = params
else:
request.url = url
return request
def _test_signature_version_4(test_case):
2021-10-04 18:33:37 +02:00
test_case = SignatureTestCase(test_case)
request = create_request_from_raw_request(test_case.raw_request)
2021-08-18 17:45:16 +02:00
auth = botocore.auth.SigV4Auth(test_case.credentials, SERVICE, REGION)
actual_canonical_request = auth.canonical_request(request)
actual_string_to_sign = auth.string_to_sign(request,
actual_canonical_request)
2021-08-18 17:45:16 +02:00
auth.add_auth(request)
actual_auth_header = request.headers['Authorization']
# Some stuff only works right when you go through auth.add_auth()
# So don't assert the interim steps unless the end result was wrong.
if actual_auth_header != test_case.authorization_header:
assert_equal(actual_canonical_request, test_case.canonical_request,
test_case.raw_request, 'canonical_request')
assert_equal(actual_string_to_sign, test_case.string_to_sign,
test_case.raw_request, 'string_to_sign')
assert_equal(actual_auth_header, test_case.authorization_header,
test_case.raw_request, 'authheader')
def _test_crt_signature_version_4(test_case):
2021-10-04 18:33:37 +02:00
test_case = SignatureTestCase(test_case)
2021-08-18 17:45:16 +02:00
request = create_request_from_raw_request(test_case.raw_request)
2021-08-18 17:45:16 +02:00
# Use CRT logging to diagnose interim steps (canonical request, etc)
# import awscrt.io
# awscrt.io.init_logging(awscrt.io.LogLevel.Trace, 'stdout')
auth = botocore.crt.auth.CrtSigV4Auth(test_case.credentials,
SERVICE, REGION)
auth.add_auth(request)
actual_auth_header = request.headers['Authorization']
assert_equal(actual_auth_header, test_case.authorization_header,
test_case.raw_request, 'authheader')
def assert_equal(actual, expected, raw_request, part):
if actual != expected:
message = "The %s did not match" % part
message += "\nACTUAL:%r !=\nEXPECT:%r" % (actual, expected)
message += '\nThe raw request was:\n%s' % raw_request
raise AssertionError(message)
2021-10-04 18:33:37 +02:00
class SignatureTestCase(object):
def __init__(self, test_case):
2021-08-18 17:45:16 +02:00
filepath = os.path.join(TESTSUITE_DIR, test_case,
os.path.basename(test_case))
# We're using io.open() because we need to open these files with
# a specific encoding, and in 2.x io.open is the best way to do this.
2021-08-18 17:45:16 +02:00
self.raw_request = io.open(filepath + '.req',
encoding='utf-8').read()
self.canonical_request = io.open(
2021-08-18 17:45:16 +02:00
filepath + '.creq',
encoding='utf-8').read().replace('\r', '')
self.string_to_sign = io.open(
2021-08-18 17:45:16 +02:00
filepath + '.sts',
encoding='utf-8').read().replace('\r', '')
self.authorization_header = io.open(
2021-08-18 17:45:16 +02:00
filepath + '.authz',
encoding='utf-8').read().replace('\r', '')
2021-08-18 17:45:16 +02:00
self.signed_request = io.open(filepath + '.sreq',
encoding='utf-8').read()
2021-08-18 17:45:16 +02:00
token_pattern = r'^x-amz-security-token:(.*)$'
token_match = re.search(token_pattern, self.canonical_request,
re.MULTILINE)
token = token_match.group(1) if token_match else None
self.credentials = Credentials(ACCESS_KEY, SECRET_KEY, token)