python-botocore/tests/unit/test_awsrequest.py

718 lines
29 KiB
Python
Raw Normal View History

#!/usr/bin/env
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from tests import unittest
import os
import tempfile
import shutil
import io
import socket
import sys
from mock import Mock, patch
2018-10-04 08:50:52 +02:00
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from botocore.exceptions import UnseekableStreamError
2018-10-04 08:50:52 +02:00
from botocore.awsrequest import AWSRequest, AWSPreparedRequest, AWSResponse
from botocore.awsrequest import AWSHTTPConnection, AWSHTTPSConnection, HeadersDict
from botocore.awsrequest import prepare_request_dict, create_request_object
from botocore.compat import file_type, six
class IgnoreCloseBytesIO(io.BytesIO):
def close(self):
pass
class FakeSocket(object):
def __init__(self, read_data, fileclass=IgnoreCloseBytesIO):
self.sent_data = b''
self.read_data = read_data
self.fileclass = fileclass
self._fp_object = None
def sendall(self, data):
self.sent_data += data
def makefile(self, mode, bufsize=None):
if self._fp_object is None:
self._fp_object = self.fileclass(self.read_data)
return self._fp_object
def close(self):
pass
class BytesIOWithLen(six.BytesIO):
def __len__(self):
return len(self.getvalue())
class Unseekable(file_type):
def __init__(self, stream):
self._stream = stream
def read(self):
return self._stream.read()
def seek(self, offset, whence):
# This is a case where seek() exists as part of the object's interface,
# but it doesn't actually work (for example socket.makefile(), which
# will raise an io.* error on python3).
raise ValueError("Underlying stream does not support seeking.")
class Seekable(object):
"""This class represents a bare-bones,seekable file-like object
Note it does not include some of the other attributes of other
file-like objects such as StringIO's getvalue() and file object's fileno
property. If the file-like object does not have either of these attributes
requests will not calculate the content length even though it is still
possible to calculate it.
"""
def __init__(self, stream):
self._stream = stream
def __iter__(self):
return iter(self._stream)
def read(self):
return self._stream.read()
def seek(self, offset, whence=0):
self._stream.seek(offset, whence)
def tell(self):
return self._stream.tell()
class TestAWSRequest(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.filename = os.path.join(self.tempdir, 'foo')
2018-10-04 08:50:52 +02:00
self.request = AWSRequest(method='GET', url='http://example.com')
self.prepared_request = self.request.prepare()
def tearDown(self):
shutil.rmtree(self.tempdir)
2018-10-04 08:50:52 +02:00
def test_prepared_request_repr(self):
expected_repr = (
'<AWSPreparedRequest stream_output=False, method=GET, '
'url=http://example.com, headers={}>'
)
request_repr = repr(self.prepared_request)
self.assertEqual(request_repr, expected_repr)
def test_can_prepare_url_params(self):
request = AWSRequest(url='http://example.com/', params={'foo': 'bar'})
prepared_request = request.prepare()
self.assertEqual(prepared_request.url, 'http://example.com/?foo=bar')
def test_can_prepare_dict_body(self):
body = {'dead': 'beef'}
request = AWSRequest(url='http://example.com/', data=body)
prepared_request = request.prepare()
self.assertEqual(prepared_request.body, 'dead=beef')
def test_can_prepare_dict_body_unicode_values(self):
body = {'Text': u'\u30c6\u30b9\u30c8 string'}
expected_body = 'Text=%E3%83%86%E3%82%B9%E3%83%88+string'
request = AWSRequest(url='http://example.com/', data=body)
prepared_request = request.prepare()
self.assertEqual(prepared_request.body, expected_body)
def test_can_prepare_dict_body_unicode_keys(self):
body = {u'\u30c6\u30b9\u30c8': 'string'}
expected_body = '%E3%83%86%E3%82%B9%E3%83%88=string'
request = AWSRequest(url='http://example.com/', data=body)
prepared_request = request.prepare()
self.assertEqual(prepared_request.body, expected_body)
def test_can_prepare_empty_body(self):
request = AWSRequest(url='http://example.com/', data=b'')
prepared_request = request.prepare()
self.assertEqual(prepared_request.body, None)
content_length = prepared_request.headers.get('content-length')
self.assertEqual(content_length, '0')
def test_request_body_is_prepared(self):
request = AWSRequest(url='http://example.com/', data='body')
self.assertEqual(request.body, b'body')
def test_prepare_body_content_adds_content_length(self):
content = b'foobarbaz'
expected_len = str(len(content))
with open(self.filename, 'wb') as f:
2018-10-04 08:50:52 +02:00
f.write(content)
with open(self.filename, 'rb') as f:
data = Seekable(f)
self.request.data = data
self.request.method = 'POST'
prepared_request = self.request.prepare()
calculated_len = prepared_request.headers['Content-Length']
self.assertEqual(calculated_len, expected_len)
def test_prepare_body_doesnt_override_content_length(self):
self.request.method = 'PUT'
self.request.headers['Content-Length'] = '20'
self.request.data = b'asdf'
prepared_request = self.request.prepare()
self.assertEqual(prepared_request.headers['Content-Length'], '20')
def test_prepare_body_doesnt_set_content_length_head(self):
self.request.method = 'HEAD'
self.request.data = b'thisshouldntbehere'
prepared_request = self.request.prepare()
self.assertEqual(prepared_request.headers.get('Content-Length'), None)
def test_prepare_body_doesnt_set_content_length_get(self):
self.request.method = 'GET'
self.request.data = b'thisshouldntbehere'
prepared_request = self.request.prepare()
self.assertEqual(prepared_request.headers.get('Content-Length'), None)
def test_prepare_body_doesnt_set_content_length_options(self):
self.request.method = 'OPTIONS'
self.request.data = b'thisshouldntbehere'
prepared_request = self.request.prepare()
self.assertEqual(prepared_request.headers.get('Content-Length'), None)
def test_can_reset_stream_handles_binary(self):
contents = b'notastream'
self.prepared_request.body = contents
self.prepared_request.reset_stream()
# assert the request body doesn't change after reset_stream is called
self.assertEqual(self.prepared_request.body, contents)
def test_can_reset_stream_handles_bytearray(self):
contents = bytearray(b'notastream')
self.prepared_request.body = contents
self.prepared_request.reset_stream()
# assert the request body doesn't change after reset_stream is called
self.assertEqual(self.prepared_request.body, contents)
def test_can_reset_stream(self):
contents = b'foobarbaz'
with open(self.filename, 'wb') as f:
f.write(contents)
with open(self.filename, 'rb') as body:
self.prepared_request.body = body
2018-10-04 08:50:52 +02:00
# pretend the request body was partially sent
body.read()
2018-10-04 08:50:52 +02:00
self.assertNotEqual(body.tell(), 0)
# have the prepared request reset its stream
self.prepared_request.reset_stream()
# the stream should be reset
self.assertEqual(body.tell(), 0)
def test_cannot_reset_stream_raises_error(self):
2018-10-04 08:50:52 +02:00
contents = b'foobarbaz'
with open(self.filename, 'wb') as f:
2018-10-04 08:50:52 +02:00
f.write(contents)
with open(self.filename, 'rb') as body:
self.prepared_request.body = Unseekable(body)
2018-10-04 08:50:52 +02:00
# pretend the request body was partially sent
body.read()
2018-10-04 08:50:52 +02:00
self.assertNotEqual(body.tell(), 0)
# reset stream should fail
with self.assertRaises(UnseekableStreamError):
2018-10-04 08:50:52 +02:00
self.prepared_request.reset_stream()
def test_duck_type_for_file_check(self):
# As part of determining whether or not we can rewind a stream
# we first need to determine if the thing is a file like object.
# We should not be using an isinstance check. Instead, we should
# be using duck type checks.
class LooksLikeFile(object):
def __init__(self):
self.seek_called = False
def read(self, amount=None):
pass
def seek(self, where):
self.seek_called = True
looks_like_file = LooksLikeFile()
self.prepared_request.body = looks_like_file
2018-10-04 08:50:52 +02:00
self.prepared_request.reset_stream()
# The stream should now be reset.
self.assertTrue(looks_like_file.seek_called)
2018-10-04 08:50:52 +02:00
class TestAWSResponse(unittest.TestCase):
def setUp(self):
2018-10-04 08:50:52 +02:00
self.response = AWSResponse('http://url.com', 200, HeadersDict(), None)
self.response.raw = Mock()
2018-10-04 08:50:52 +02:00
def set_raw_stream(self, blobs):
def stream(*args, **kwargs):
for blob in blobs:
yield blob
self.response.raw.stream.return_value = stream()
2018-10-04 08:50:52 +02:00
def test_content_property(self):
self.set_raw_stream([b'some', b'data'])
self.assertEqual(self.response.content, b'somedata')
self.assertEqual(self.response.content, b'somedata')
# assert that stream was not called more than once
self.assertEqual(self.response.raw.stream.call_count, 1)
2018-10-04 08:50:52 +02:00
def test_text_property(self):
self.set_raw_stream([b'\xe3\x82\xb8\xe3\x83\xa7\xe3\x82\xb0'])
self.response.headers['content-type'] = 'text/plain; charset=utf-8'
self.assertEquals(self.response.text, u'\u30b8\u30e7\u30b0')
2018-10-04 08:50:52 +02:00
def test_text_property_defaults_utf8(self):
self.set_raw_stream([b'\xe3\x82\xb8\xe3\x83\xa7\xe3\x82\xb0'])
self.assertEquals(self.response.text, u'\u30b8\u30e7\u30b0')
class TestAWSHTTPConnection(unittest.TestCase):
def create_tunneled_connection(self, url, port, response):
s = FakeSocket(response)
conn = AWSHTTPConnection(url, port)
conn.sock = s
conn._tunnel_host = url
conn._tunnel_port = port
conn._tunnel_headers = {'key': 'value'}
# Create a mock response.
self.mock_response = Mock()
self.mock_response.fp = Mock()
# Imitate readline function by creating a list to be sent as
# a side effect of the mocked readline to be able to track how the
# response is processed in ``_tunnel()``.
delimeter = b'\r\n'
side_effect = []
response_components = response.split(delimeter)
for i in range(len(response_components)):
new_component = response_components[i]
# Only add the delimeter on if it is not the last component
# which should be an empty string.
if i != len(response_components) - 1:
new_component += delimeter
side_effect.append(new_component)
self.mock_response.fp.readline.side_effect = side_effect
response_components = response.split(b' ')
self.mock_response._read_status.return_value = (
response_components[0], int(response_components[1]),
response_components[2]
)
conn.response_class = Mock()
conn.response_class.return_value = self.mock_response
return conn
def test_expect_100_continue_returned(self):
2018-10-04 08:50:52 +02:00
with patch('urllib3.util.wait_for_read') as wait_mock:
# Shows the server first sending a 100 continue response
# then a 200 ok response.
s = FakeSocket(b'HTTP/1.1 100 Continue\r\n\r\nHTTP/1.1 200 OK\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
2018-10-04 08:50:52 +02:00
wait_mock.return_value = True
conn.request('GET', '/bucket/foo', b'body',
2018-10-04 08:50:52 +02:00
{'Expect': b'100-continue'})
response = conn.getresponse()
2018-10-04 08:50:52 +02:00
# Assert that we waited for the 100-continue response
self.assertEqual(wait_mock.call_count, 1)
# Now we should verify that our final response is the 200 OK
self.assertEqual(response.status, 200)
def test_handles_expect_100_with_different_reason_phrase(self):
2018-10-04 08:50:52 +02:00
with patch('urllib3.util.wait_for_read') as wait_mock:
# Shows the server first sending a 100 continue response
# then a 200 ok response.
s = FakeSocket(b'HTTP/1.1 100 (Continue)\r\n\r\nHTTP/1.1 200 OK\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
2018-10-04 08:50:52 +02:00
wait_mock.return_value = True
conn.request('GET', '/bucket/foo', six.BytesIO(b'body'),
2018-10-04 08:50:52 +02:00
{'Expect': b'100-continue', 'Content-Length': b'4'})
response = conn.getresponse()
# Now we should verify that our final response is the 200 OK.
self.assertEqual(response.status, 200)
2018-10-04 08:50:52 +02:00
# Assert that we waited for the 100-continue response
self.assertEqual(wait_mock.call_count, 1)
# Verify that we went the request body because we got a 100
# continue.
self.assertIn(b'body', s.sent_data)
def test_expect_100_sends_connection_header(self):
# When using squid as an HTTP proxy, it will also send
# a Connection: keep-alive header back with the 100 continue
# response. We need to ensure we handle this case.
2018-10-04 08:50:52 +02:00
with patch('urllib3.util.wait_for_read') as wait_mock:
# Shows the server first sending a 100 continue response
# then a 500 response. We're picking 500 to confirm we
# actually parse the response instead of getting the
# default status of 200 which happens when we can't parse
# the response.
s = FakeSocket(b'HTTP/1.1 100 Continue\r\n'
b'Connection: keep-alive\r\n'
b'\r\n'
b'HTTP/1.1 500 Internal Service Error\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
2018-10-04 08:50:52 +02:00
wait_mock.return_value = True
conn.request('GET', '/bucket/foo', b'body',
2018-10-04 08:50:52 +02:00
{'Expect': b'100-continue'})
# Assert that we waited for the 100-continue response
self.assertEqual(wait_mock.call_count, 1)
response = conn.getresponse()
self.assertEqual(response.status, 500)
def test_expect_100_continue_sends_307(self):
# This is the case where we send a 100 continue and the server
# immediately sends a 307
2018-10-04 08:50:52 +02:00
with patch('urllib3.util.wait_for_read') as wait_mock:
# Shows the server first sending a 100 continue response
# then a 200 ok response.
s = FakeSocket(
b'HTTP/1.1 307 Temporary Redirect\r\n'
b'Location: http://example.org\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
2018-10-04 08:50:52 +02:00
wait_mock.return_value = True
conn.request('GET', '/bucket/foo', b'body',
2018-10-04 08:50:52 +02:00
{'Expect': b'100-continue'})
# Assert that we waited for the 100-continue response
self.assertEqual(wait_mock.call_count, 1)
response = conn.getresponse()
# Now we should verify that our final response is the 307.
self.assertEqual(response.status, 307)
def test_expect_100_continue_no_response_from_server(self):
2018-10-04 08:50:52 +02:00
with patch('urllib3.util.wait_for_read') as wait_mock:
# Shows the server first sending a 100 continue response
# then a 200 ok response.
s = FakeSocket(
b'HTTP/1.1 307 Temporary Redirect\r\n'
b'Location: http://example.org\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
2018-10-04 08:50:52 +02:00
# By settings wait_mock to return False, this indicates
# that the server did not send any response. In this situation
# we should just send the request anyways.
2018-10-04 08:50:52 +02:00
wait_mock.return_value = False
conn.request('GET', '/bucket/foo', b'body',
2018-10-04 08:50:52 +02:00
{'Expect': b'100-continue'})
# Assert that we waited for the 100-continue response
self.assertEqual(wait_mock.call_count, 1)
response = conn.getresponse()
self.assertEqual(response.status, 307)
def test_message_body_is_file_like_object(self):
# Shows the server first sending a 100 continue response
# then a 200 ok response.
body = BytesIOWithLen(b'body contents')
s = FakeSocket(b'HTTP/1.1 200 OK\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
conn.request('GET', '/bucket/foo', body)
response = conn.getresponse()
self.assertEqual(response.status, 200)
def test_no_expect_header_set(self):
# Shows the server first sending a 100 continue response
# then a 200 ok response.
s = FakeSocket(b'HTTP/1.1 200 OK\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
conn.request('GET', '/bucket/foo', b'body')
response = conn.getresponse()
self.assertEqual(response.status, 200)
def test_tunnel_readline_none_bugfix(self):
# Tests whether ``_tunnel`` function is able to work around the
# py26 bug of avoiding infinite while loop if nothing is returned.
conn = self.create_tunneled_connection(
url='s3.amazonaws.com',
port=443,
response=b'HTTP/1.1 200 OK\r\n',
)
conn._tunnel()
# Ensure proper amount of readline calls were made.
self.assertEqual(self.mock_response.fp.readline.call_count, 2)
def test_tunnel_readline_normal(self):
# Tests that ``_tunnel`` function behaves normally when it comes
# across the usual http ending.
conn = self.create_tunneled_connection(
url='s3.amazonaws.com',
port=443,
response=b'HTTP/1.1 200 OK\r\n\r\n',
)
conn._tunnel()
# Ensure proper amount of readline calls were made.
self.assertEqual(self.mock_response.fp.readline.call_count, 2)
def test_tunnel_raises_socket_error(self):
# Tests that ``_tunnel`` function throws appropriate error when
# not 200 status.
conn = self.create_tunneled_connection(
url='s3.amazonaws.com',
port=443,
response=b'HTTP/1.1 404 Not Found\r\n\r\n',
)
with self.assertRaises(socket.error):
conn._tunnel()
@unittest.skipIf(sys.version_info[:2] == (2, 6),
("``_tunnel()`` function defaults to standard "
"http library function when not py26."))
def test_tunnel_uses_std_lib(self):
s = FakeSocket(b'HTTP/1.1 200 OK\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
# Test that the standard library method was used by patching out
# the ``_tunnel`` method and seeing if the std lib method was called.
2018-10-04 08:50:52 +02:00
with patch('urllib3.connection.HTTPConnection._tunnel') as mock_tunnel:
conn._tunnel()
self.assertTrue(mock_tunnel.called)
def test_encodes_unicode_method_line(self):
s = FakeSocket(b'HTTP/1.1 200 OK\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
# Note the combination of unicode 'GET' and
# bytes 'Utf8-Header' value.
conn.request(u'GET', '/bucket/foo', b'body',
headers={"Utf8-Header": b"\xe5\xb0\x8f"})
response = conn.getresponse()
self.assertEqual(response.status, 200)
def test_state_reset_on_connection_close(self):
# This simulates what urllib3 does with connections
# in its connection pool logic.
2018-10-04 08:50:52 +02:00
with patch('urllib3.util.wait_for_read') as wait_mock:
# First fast fail with a 500 response when we first
# send the expect header.
s = FakeSocket(b'HTTP/1.1 500 Internal Server Error\r\n')
conn = AWSHTTPConnection('s3.amazonaws.com', 443)
conn.sock = s
2018-10-04 08:50:52 +02:00
wait_mock.return_value = True
conn.request('GET', '/bucket/foo', b'body',
2018-10-04 08:50:52 +02:00
{'Expect': b'100-continue'})
self.assertEqual(wait_mock.call_count, 1)
response = conn.getresponse()
self.assertEqual(response.status, 500)
# Now what happens in urllib3 is that when the next
# request comes along and this conection gets checked
# out. We see that the connection needs to be
# reset. So first the connection is closed.
conn.close()
# And then a new connection is established.
new_conn = FakeSocket(
b'HTTP/1.1 100 (Continue)\r\n\r\nHTTP/1.1 200 OK\r\n')
conn.sock = new_conn
# And we make a request, we should see the 200 response
# that was sent back.
2018-10-04 08:50:52 +02:00
wait_mock.return_value = True
conn.request('GET', '/bucket/foo', b'body',
2018-10-04 08:50:52 +02:00
{'Expect': b'100-continue'})
# Assert that we waited for the 100-continue response
self.assertEqual(wait_mock.call_count, 2)
response = conn.getresponse()
# This should be 200. If it's a 500 then
# the prior response was leaking into our
# current response.,
self.assertEqual(response.status, 200)
2018-10-04 08:50:52 +02:00
class TestAWSHTTPConnectionPool(unittest.TestCase):
def test_global_urllib3_pool_is_unchanged(self):
http_connection_class = HTTPConnectionPool.ConnectionCls
self.assertIsNot(http_connection_class, AWSHTTPConnection)
https_connection_class = HTTPSConnectionPool.ConnectionCls
self.assertIsNot(https_connection_class, AWSHTTPSConnection)
class TestPrepareRequestDict(unittest.TestCase):
def setUp(self):
self.user_agent = 'botocore/1.0'
self.endpoint_url = 'https://s3.amazonaws.com'
self.base_request_dict = {
'body': '',
'headers': {},
'method': u'GET',
'query_string': '',
'url_path': '/',
'context': {}
}
def prepare_base_request_dict(self, request_dict, endpoint_url=None,
user_agent=None, context=None):
self.base_request_dict.update(request_dict)
context = context or {}
if user_agent is None:
user_agent = self.user_agent
if endpoint_url is None:
endpoint_url = self.endpoint_url
prepare_request_dict(self.base_request_dict, endpoint_url=endpoint_url,
user_agent=user_agent, context=context)
def test_prepare_request_dict_for_get(self):
request_dict = {
'method': u'GET',
'url_path': '/'
}
self.prepare_base_request_dict(
request_dict, endpoint_url='https://s3.amazonaws.com')
self.assertEqual(self.base_request_dict['method'], 'GET')
self.assertEqual(self.base_request_dict['url'],
'https://s3.amazonaws.com/')
self.assertEqual(self.base_request_dict['headers']['User-Agent'],
self.user_agent)
def test_prepare_request_dict_for_get_no_user_agent(self):
self.user_agent = None
request_dict = {
'method': u'GET',
'url_path': '/'
}
self.prepare_base_request_dict(
request_dict, endpoint_url='https://s3.amazonaws.com')
self.assertNotIn('User-Agent', self.base_request_dict['headers'])
def test_prepare_request_dict_with_context(self):
context = {'foo': 'bar'}
self.prepare_base_request_dict({}, context=context)
self.assertEqual(self.base_request_dict['context'], context)
def test_query_string_serialized_to_url(self):
request_dict = {
'method': u'GET',
'query_string': {u'prefix': u'foo'},
'url_path': u'/mybucket'
}
self.prepare_base_request_dict(request_dict)
self.assertEqual(
self.base_request_dict['url'],
'https://s3.amazonaws.com/mybucket?prefix=foo')
def test_url_path_combined_with_endpoint_url(self):
# This checks the case where a user specifies and
# endpoint_url that has a path component, and the
# serializer gives us a request_dict that has a url
# component as well (say from a rest-* service).
request_dict = {
'query_string': {u'prefix': u'foo'},
'url_path': u'/mybucket'
}
endpoint_url = 'https://custom.endpoint/foo/bar'
self.prepare_base_request_dict(request_dict, endpoint_url)
self.assertEqual(
self.base_request_dict['url'],
'https://custom.endpoint/foo/bar/mybucket?prefix=foo')
def test_url_path_with_trailing_slash(self):
self.prepare_base_request_dict(
{'url_path': u'/mybucket'},
endpoint_url='https://custom.endpoint/foo/bar/')
self.assertEqual(
self.base_request_dict['url'],
'https://custom.endpoint/foo/bar/mybucket')
def test_url_path_is_slash(self):
self.prepare_base_request_dict(
{'url_path': u'/'},
endpoint_url='https://custom.endpoint/foo/bar/')
self.assertEqual(
self.base_request_dict['url'],
'https://custom.endpoint/foo/bar/')
def test_url_path_is_slash_with_endpoint_url_no_slash(self):
self.prepare_base_request_dict(
{'url_path': u'/'},
endpoint_url='https://custom.endpoint/foo/bar')
self.assertEqual(
self.base_request_dict['url'],
'https://custom.endpoint/foo/bar')
def test_custom_endpoint_with_query_string(self):
self.prepare_base_request_dict(
{'url_path': u'/baz', 'query_string': {'x': 'y'}},
endpoint_url='https://custom.endpoint/foo/bar?foo=bar')
self.assertEqual(
self.base_request_dict['url'],
'https://custom.endpoint/foo/bar/baz?foo=bar&x=y')
class TestCreateRequestObject(unittest.TestCase):
def setUp(self):
self.request_dict = {
'method': u'GET',
'query_string': {u'prefix': u'foo'},
'url_path': u'/mybucket',
'headers': {u'User-Agent': u'my-agent'},
'body': u'my body',
'url': u'https://s3.amazonaws.com/mybucket?prefix=foo',
'context': {'signing': {'region': 'us-west-2'}}
}
def test_create_request_object(self):
request = create_request_object(self.request_dict)
self.assertEqual(request.method, self.request_dict['method'])
self.assertEqual(request.url, self.request_dict['url'])
self.assertEqual(request.data, self.request_dict['body'])
self.assertEqual(request.context, self.request_dict['context'])
self.assertIn('User-Agent', request.headers)
2018-10-04 08:50:52 +02:00
class TestHeadersDict(unittest.TestCase):
def setUp(self):
self.headers = HeadersDict()
def test_get_insensitive(self):
self.headers['foo'] = 'bar'
self.assertEqual(self.headers['FOO'], 'bar')
def test_set_insensitive(self):
self.headers['foo'] = 'bar'
self.headers['FOO'] = 'baz'
self.assertEqual(self.headers['foo'], 'baz')
def test_del_insensitive(self):
self.headers['foo'] = 'bar'
self.assertEqual(self.headers['FOO'], 'bar')
del self.headers['FoO']
with self.assertRaises(KeyError):
self.headers['foo']
def test_iteration(self):
self.headers['FOO'] = 'bar'
self.headers['dead'] = 'beef'
self.assertIn('FOO', list(self.headers))
self.assertIn('dead', list(self.headers))
headers_items = list(self.headers.items())
self.assertIn(('FOO', 'bar'), headers_items)
self.assertIn(('dead', 'beef'), headers_items)
if __name__ == "__main__":
unittest.main()