New upstream version 1.18.53+dfsg

This commit is contained in:
Noah Meyerhans 2021-10-04 10:51:32 -07:00
parent 44abe9ff76
commit 3056914cc0
50 changed files with 1042 additions and 928 deletions

17
.changes/1.18.47.json Normal file
View File

@ -0,0 +1,17 @@
[
{
"category": "``mediaconvert``",
"description": "[``botocore``] This release adds style and positioning support for caption or subtitle burn-in from rich text sources such as TTML. This release also introduces configurable image-based trick play track generation.",
"type": "api-change"
},
{
"category": "``appsync``",
"description": "[``botocore``] Documented the new OpenSearchServiceDataSourceConfig data type. Added deprecation notes to the ElasticsearchDataSourceConfig data type.",
"type": "api-change"
},
{
"category": "``ssm``",
"description": "[``botocore``] Added cutoff behavior support for preventing new task invocations from starting when the maintenance window cutoff time is reached.",
"type": "api-change"
}
]

12
.changes/1.18.48.json Normal file
View File

@ -0,0 +1,12 @@
[
{
"category": "``license-manager``",
"description": "[``botocore``] AWS License Manager now allows customers to get the LicenseArn in the Checkout API Response.",
"type": "api-change"
},
{
"category": "``ec2``",
"description": "[``botocore``] DescribeInstances now returns Platform Details, Usage Operation, and Usage Operation Update Time.",
"type": "api-change"
}
]

32
.changes/1.18.49.json Normal file
View File

@ -0,0 +1,32 @@
[
{
"category": "``appintegrations``",
"description": "[``botocore``] The Amazon AppIntegrations service enables you to configure and reuse connections to external applications.",
"type": "api-change"
},
{
"category": "``wisdom``",
"description": "[``botocore``] Released Amazon Connect Wisdom, a feature of Amazon Connect, which provides real-time recommendations and search functionality in general availability (GA). For more information, see https://docs.aws.amazon.com/wisdom/latest/APIReference/Welcome.html.",
"type": "api-change"
},
{
"category": "``pinpoint``",
"description": "[``botocore``] Added support for journey with contact center activity",
"type": "api-change"
},
{
"category": "``voice-id``",
"description": "[``botocore``] Released the Amazon Voice ID SDK, for usage with the Amazon Connect Voice ID feature released for Amazon Connect.",
"type": "api-change"
},
{
"category": "``connect``",
"description": "[``botocore``] This release updates a set of APIs: CreateIntegrationAssociation, ListIntegrationAssociations, CreateUseCase, and StartOutboundVoiceContact. You can use it to create integrations with Amazon Pinpoint for the Amazon Connect Campaigns use case, Amazon Connect Voice ID, and Amazon Connect Wisdom.",
"type": "api-change"
},
{
"category": "``elbv2``",
"description": "[``botocore``] Update elbv2 client to latest version",
"type": "api-change"
}
]

12
.changes/1.18.50.json Normal file
View File

@ -0,0 +1,12 @@
[
{
"category": "``transfer``",
"description": "[``botocore``] Added changes for managed workflows feature APIs.",
"type": "api-change"
},
{
"category": "``imagebuilder``",
"description": "[``botocore``] Fix description for AmiDistributionConfiguration Name property, which actually refers to the output AMI name. Also updated for consistent terminology to use \"base\" image, and another update to fix description text.",
"type": "api-change"
}
]

17
.changes/1.18.51.json Normal file
View File

@ -0,0 +1,17 @@
[
{
"category": "``lambda``",
"description": "[``botocore``] Adds support for Lambda functions powered by AWS Graviton2 processors. Customers can now select the CPU architecture for their functions.",
"type": "api-change"
},
{
"category": "``sesv2``",
"description": "[``botocore``] This release includes the ability to use 2048 bits RSA key pairs for DKIM in SES, either with Easy DKIM or Bring Your Own DKIM.",
"type": "api-change"
},
{
"category": "``amp``",
"description": "[``botocore``] This release adds alert manager and rule group namespace APIs",
"type": "api-change"
}
]

37
.changes/1.18.52.json Normal file
View File

@ -0,0 +1,37 @@
[
{
"category": "``network-firewall``",
"description": "[``botocore``] This release adds support for strict ordering for stateful rule groups. Using strict ordering, stateful rules are evaluated in the exact order in which you provide them.",
"type": "api-change"
},
{
"category": "``dataexchange``",
"description": "[``botocore``] This release enables subscribers to set up automatic exports of newly published revisions using the new EventAction API.",
"type": "api-change"
},
{
"category": "``workmail``",
"description": "[``botocore``] This release adds support for mobile device access overrides management in Amazon WorkMail.",
"type": "api-change"
},
{
"category": "``account``",
"description": "[``botocore``] This release of the Account Management API enables customers to manage the alternate contacts for their AWS accounts. For more information, see https://docs.aws.amazon.com/accounts/latest/reference/accounts-welcome.html",
"type": "api-change"
},
{
"category": "``workspaces``",
"description": "[``botocore``] Added CreateUpdatedWorkspaceImage API to update WorkSpace images with latest software and drivers. Updated DescribeWorkspaceImages API to display if there are updates available for WorkSpace images.",
"type": "api-change"
},
{
"category": "``cloudcontrol``",
"description": "[``botocore``] Initial release of the SDK for AWS Cloud Control API",
"type": "api-change"
},
{
"category": "``macie2``",
"description": "[``botocore``] Amazon S3 bucket metadata now indicates whether an error or a bucket's permissions settings prevented Amazon Macie from retrieving data about the bucket or the bucket's objects.",
"type": "api-change"
}
]

17
.changes/1.18.53.json Normal file
View File

@ -0,0 +1,17 @@
[
{
"category": "``synthetics``",
"description": "[``botocore``] CloudWatch Synthetics now enables customers to choose a customer managed AWS KMS key or an Amazon S3-managed key instead of an AWS managed key (default) for the encryption of artifacts that the canary stores in Amazon S3. CloudWatch Synthetics also supports artifact S3 location updation now.",
"type": "api-change"
},
{
"category": "``ssm``",
"description": "[``botocore``] When \"AutoApprovable\" is true for a Change Template, then specifying --auto-approve (boolean) in Start-Change-Request-Execution will create a change request that bypasses approver review. (except for change calendar restrictions)",
"type": "api-change"
},
{
"category": "``apprunner``",
"description": "[``botocore``] This release contains several minor bug fixes.",
"type": "api-change"
}
]

View File

@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.6, 3.7, 3.8]
python-version: [3.6, 3.7, 3.8, 3.9, 3.10-dev]
os: [ubuntu-latest, macOS-latest, windows-latest ]
steps:

View File

@ -10,19 +10,25 @@ jobs:
runs-on: ubuntu-latest
name: Stale issue job
steps:
- uses: aws-actions/stale-issue-cleanup@v3
- uses: aws-actions/stale-issue-cleanup@v4
with:
# Setting messages to an empty string will cause the automation to skip
# that category
ancient-issue-message: Greetings! It looks like this issue hasnt been active in longer than one year. We encourage you to check if this is still an issue in the latest release. Because it has been longer than one year since the last update on this, and in the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please feel free to provide a comment to prevent automatic closure, or if the issue is already closed, please feel free to reopen it.
stale-issue-message: Greetings! It looks like this issue hasnt been active in longer than a week. We encourage you to check if this is still an issue in the latest release. Because it has been longer than a week since the last update on this, and in the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please feel free to provide a comment or add an upvote to prevent automatic closure, or if the issue is already closed, please feel free to open a new one.
stale-pr-message: Greetings! It looks like this PR hasnt been active in longer than a week, add a comment or an upvote to prevent automatic closure, or if the issue is already closed, please feel free to open a new one.
issue-types: issues
ancient-issue-message: Greetings! It looks like this issue hasnt been active in longer
than one year. We encourage you to check if this is still an issue in the latest release.
In the absence of more information, we will be closing this issue soon.
If you find that this is still a problem, please feel free to provide a comment or upvote
with a reaction on the initial post to prevent automatic closure. If the issue is already closed,
please feel free to open a new one.
stale-issue-message: Greetings! It looks like this issue hasnt been
active in longer than five days. We encourage you to check if this is still an issue in the latest release.
In the absence of more information, we will be closing this issue soon.
If you find that this is still a problem, please feel free to provide a comment or upvote
with a reaction on the initial post to prevent automatic closure. If the issue is already closed,
please feel free to open a new one.
# These labels are required
stale-issue-label: closing-soon
exempt-issue-label: auto-label-exempt
stale-pr-label: closing-soon
exempt-pr-label: pr/needs-review,auto-label-exempt
exempt-issue-labels: automation-exempt,needs-review,bug
response-requested-label: response-requested
# Don't set closed-for-staleness label to skip closing very old issues
@ -30,8 +36,8 @@ jobs:
closed-for-staleness-label: closed-for-staleness
# Issue timing
days-before-stale: 7
days-before-close: 4
days-before-stale: 5
days-before-close: 2
days-before-ancient: 365
# If you don't want to mark a issue as being ancient based on a
# threshold of "upvotes", you can set this here. An "upvote" is

1
.gitignore vendored
View File

@ -13,7 +13,6 @@ tests/.coverage
.tox
.coverage
coverage.xml
nosetests.xml
# Common virtualenv names
venv

View File

@ -2,6 +2,67 @@
CHANGELOG
=========
1.18.53
=======
* api-change:``synthetics``: [``botocore``] CloudWatch Synthetics now enables customers to choose a customer managed AWS KMS key or an Amazon S3-managed key instead of an AWS managed key (default) for the encryption of artifacts that the canary stores in Amazon S3. CloudWatch Synthetics also supports artifact S3 location updation now.
* api-change:``ssm``: [``botocore``] When "AutoApprovable" is true for a Change Template, then specifying --auto-approve (boolean) in Start-Change-Request-Execution will create a change request that bypasses approver review. (except for change calendar restrictions)
* api-change:``apprunner``: [``botocore``] This release contains several minor bug fixes.
1.18.52
=======
* api-change:``network-firewall``: [``botocore``] This release adds support for strict ordering for stateful rule groups. Using strict ordering, stateful rules are evaluated in the exact order in which you provide them.
* api-change:``dataexchange``: [``botocore``] This release enables subscribers to set up automatic exports of newly published revisions using the new EventAction API.
* api-change:``workmail``: [``botocore``] This release adds support for mobile device access overrides management in Amazon WorkMail.
* api-change:``account``: [``botocore``] This release of the Account Management API enables customers to manage the alternate contacts for their AWS accounts. For more information, see https://docs.aws.amazon.com/accounts/latest/reference/accounts-welcome.html
* api-change:``workspaces``: [``botocore``] Added CreateUpdatedWorkspaceImage API to update WorkSpace images with latest software and drivers. Updated DescribeWorkspaceImages API to display if there are updates available for WorkSpace images.
* api-change:``cloudcontrol``: [``botocore``] Initial release of the SDK for AWS Cloud Control API
* api-change:``macie2``: [``botocore``] Amazon S3 bucket metadata now indicates whether an error or a bucket's permissions settings prevented Amazon Macie from retrieving data about the bucket or the bucket's objects.
1.18.51
=======
* api-change:``lambda``: [``botocore``] Adds support for Lambda functions powered by AWS Graviton2 processors. Customers can now select the CPU architecture for their functions.
* api-change:``sesv2``: [``botocore``] This release includes the ability to use 2048 bits RSA key pairs for DKIM in SES, either with Easy DKIM or Bring Your Own DKIM.
* api-change:``amp``: [``botocore``] This release adds alert manager and rule group namespace APIs
1.18.50
=======
* api-change:``transfer``: [``botocore``] Added changes for managed workflows feature APIs.
* api-change:``imagebuilder``: [``botocore``] Fix description for AmiDistributionConfiguration Name property, which actually refers to the output AMI name. Also updated for consistent terminology to use "base" image, and another update to fix description text.
1.18.49
=======
* api-change:``appintegrations``: [``botocore``] The Amazon AppIntegrations service enables you to configure and reuse connections to external applications.
* api-change:``wisdom``: [``botocore``] Released Amazon Connect Wisdom, a feature of Amazon Connect, which provides real-time recommendations and search functionality in general availability (GA). For more information, see https://docs.aws.amazon.com/wisdom/latest/APIReference/Welcome.html.
* api-change:``pinpoint``: [``botocore``] Added support for journey with contact center activity
* api-change:``voice-id``: [``botocore``] Released the Amazon Voice ID SDK, for usage with the Amazon Connect Voice ID feature released for Amazon Connect.
* api-change:``connect``: [``botocore``] This release updates a set of APIs: CreateIntegrationAssociation, ListIntegrationAssociations, CreateUseCase, and StartOutboundVoiceContact. You can use it to create integrations with Amazon Pinpoint for the Amazon Connect Campaigns use case, Amazon Connect Voice ID, and Amazon Connect Wisdom.
* api-change:``elbv2``: [``botocore``] Update elbv2 client to latest version
1.18.48
=======
* api-change:``license-manager``: [``botocore``] AWS License Manager now allows customers to get the LicenseArn in the Checkout API Response.
* api-change:``ec2``: [``botocore``] DescribeInstances now returns Platform Details, Usage Operation, and Usage Operation Update Time.
1.18.47
=======
* api-change:``mediaconvert``: [``botocore``] This release adds style and positioning support for caption or subtitle burn-in from rich text sources such as TTML. This release also introduces configurable image-based trick play track generation.
* api-change:``appsync``: [``botocore``] Documented the new OpenSearchServiceDataSourceConfig data type. Added deprecation notes to the ElasticsearchDataSourceConfig data type.
* api-change:``ssm``: [``botocore``] Added cutoff behavior support for preventing new task invocations from starting when the maintenance window cutoff time is reached.
1.18.46
=======

View File

@ -84,9 +84,9 @@ Running Tests
~~~~~~~~~~~~~
You can run tests in all supported Python versions using ``tox``. By default,
it will run all of the unit and functional tests, but you can also specify your own
``nosetests`` options. Note that this requires that you have all supported
``pytest`` options. Note that this requires that you have all supported
versions of Python installed, otherwise you must pass ``-e`` or run the
``nosetests`` command directly:
``pytest`` command directly:
.. code-block:: sh
@ -98,7 +98,7 @@ You can also run individual tests with your default Python version:
.. code-block:: sh
$ nosetests tests/unit
$ pytest tests/unit
Getting Help

View File

@ -18,7 +18,7 @@ from boto3.compat import _warn_deprecated_python
__author__ = 'Amazon Web Services'
__version__ = '1.18.46'
__version__ = '1.18.53'
# The default Boto3 session; autoloaded when needed.

View File

@ -34,8 +34,7 @@ def process_args(args):
test_args = ""
if args.with_cov:
test_args += (
f"--with-xunit --cover-erase --with-coverage "
f"--cover-package {PACKAGE} --cover-xml -v "
f"--cov={PACKAGE} --cov-report xml -v "
)
dirs = " ".join(args.test_dirs)
@ -53,8 +52,8 @@ if __name__ == "__main__":
parser.add_argument(
"-r",
"--test-runner",
default="nosetests",
help="Test runner to execute tests. Defaults to nose.",
default="pytest",
help="Test runner to execute tests. Defaults to pytest.",
)
parser.add_argument(
"-c",

View File

@ -3,7 +3,7 @@ universal = 0
[metadata]
requires_dist =
botocore>=1.21.46,<1.22.0
botocore>=1.21.53,<1.22.0
jmespath>=0.7.1,<1.0.0
s3transfer>=0.5.0,<0.6.0

View File

@ -6,15 +6,14 @@ distutils/setuptools install script.
import os
import re
from setuptools import setup, find_packages
from setuptools import find_packages, setup
ROOT = os.path.dirname(__file__)
VERSION_RE = re.compile(r'''__version__ = ['"]([0-9.]+)['"]''')
requires = [
'botocore>=1.21.46,<1.22.0',
'botocore>=1.21.53,<1.22.0',
'jmespath>=0.7.1,<1.0.0',
's3transfer>=0.5.0,<0.6.0'
]
@ -54,6 +53,8 @@ setup(
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
],
project_urls={
'Documentation': 'https://boto3.amazonaws.com/v1/documentation/api/latest/index.html',

View File

@ -16,14 +16,14 @@ from tests import unittest
class BaseDocsFunctionalTests(unittest.TestCase):
def assert_contains_lines_in_order(self, lines, contents):
for line in lines:
self.assertIn(line, contents)
assert line in contents
beginning = contents.find(line)
contents = contents[(beginning + len(line)):]
def get_class_document_block(self, class_name, contents):
start_class_document = '.. py:class:: %s' % class_name
start_index = contents.find(start_class_document)
self.assertNotEqual(start_index, -1, 'Class is not found in contents')
assert start_index != -1, 'Class is not found in contents'
contents = contents[start_index:]
end_index = contents.find(
' .. py:class::', len(start_class_document))
@ -32,7 +32,7 @@ class BaseDocsFunctionalTests(unittest.TestCase):
def get_method_document_block(self, method_name, contents):
start_method_document = ' .. py:method:: %s(' % method_name
start_index = contents.find(start_method_document)
self.assertNotEqual(start_index, -1, 'Method is not found in contents')
assert start_index != -1, 'Method is not found in contents'
contents = contents[start_index:]
end_index = contents.find(
' .. py:method::', len(start_method_document))
@ -41,8 +41,7 @@ class BaseDocsFunctionalTests(unittest.TestCase):
def get_request_syntax_document_block(self, contents):
start_marker = '**Request Syntax**'
start_index = contents.find(start_marker)
self.assertNotEqual(
start_index, -1, 'There is no request syntax section')
assert start_index != -1, 'There is no request syntax section'
contents = contents[start_index:]
end_index = contents.find(
':type', len(start_marker))
@ -51,8 +50,7 @@ class BaseDocsFunctionalTests(unittest.TestCase):
def get_response_syntax_document_block(self, contents):
start_marker = '**Response Syntax**'
start_index = contents.find(start_marker)
self.assertNotEqual(
start_index, -1, 'There is no response syntax section')
assert start_index != -1, 'There is no response syntax section'
contents = contents[start_index:]
end_index = contents.find(
'**Response Structure**', len(start_marker))
@ -61,7 +59,7 @@ class BaseDocsFunctionalTests(unittest.TestCase):
def get_request_parameter_document_block(self, param_name, contents):
start_param_document = ':type %s:' % param_name
start_index = contents.find(start_param_document)
self.assertNotEqual(start_index, -1, 'Param is not found in contents')
assert start_index != -1, 'Param is not found in contents'
contents = contents[start_index:]
end_index = contents.find(':type', len(start_param_document))
return contents[:end_index]
@ -69,11 +67,11 @@ class BaseDocsFunctionalTests(unittest.TestCase):
def get_response_parameter_document_block(self, param_name, contents):
start_param_document = '**Response Structure**'
start_index = contents.find(start_param_document)
self.assertNotEqual(start_index, -1, 'There is no response structure')
assert start_index != -1, 'There is no response structure'
start_param_document = '- **%s**' % param_name
start_index = contents.find(start_param_document)
self.assertNotEqual(start_index, -1, 'Param is not found in contents')
assert start_index != -1, 'Param is not found in contents'
contents = contents[start_index:]
end_index = contents.find('- **', len(start_param_document))
return contents[:end_index]

View File

@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from nose.tools import assert_true
import pytest
import botocore.session
from botocore import xform_name
from botocore.exceptions import DataNotFoundError
@ -19,50 +19,76 @@ import boto3
from boto3.docs.service import ServiceDocumenter
def test_docs_generated():
"""Verify we can generate the appropriate docs for all services"""
@pytest.fixture
def botocore_session():
return botocore.session.get_session()
@pytest.fixture
def boto3_session():
return boto3.Session(region_name='us-east-1')
def all_services():
botocore_session = botocore.session.get_session()
session = boto3.Session(region_name='us-east-1')
for service_name in session.get_available_services():
generated_docs = ServiceDocumenter(
service_name, session=session).document_service()
generated_docs = generated_docs.decode('utf-8')
client = boto3.client(service_name, 'us-east-1')
yield service_name
# Check that all of the services have the appropriate title
yield (_assert_has_title, generated_docs, client)
# Check that all services have the client documented.
yield (_assert_has_client_documentation, generated_docs, service_name,
client)
@pytest.fixture
def available_resources():
session = boto3.Session(region_name='us-east-1')
return session.get_available_resources()
# If the client can paginate, make sure the paginators are documented.
try:
paginator_model = botocore_session.get_paginator_model(
@pytest.mark.parametrize('service_name', all_services())
def test_documentation(
boto3_session, botocore_session, available_resources, service_name
):
generated_docs = ServiceDocumenter(
service_name, session=boto3_session).document_service()
generated_docs = generated_docs.decode('utf-8')
client = boto3.client(service_name, 'us-east-1')
# Check that all of the services have the appropriate title
_assert_has_title(generated_docs, client)
# Check that all services have the client documented.
_assert_has_client_documentation(generated_docs, service_name, client)
#If the service has resources, make sure the service resource
#is at least documented.
if service_name in available_resources:
resource = boto3.resource(service_name, 'us-east-1')
_assert_has_resource_documentation(
generated_docs, service_name, resource
)
# If the client can paginate, make sure the paginators are documented.
try:
paginator_model = botocore_session.get_paginator_model(
service_name)
yield (_assert_has_paginator_documentation, generated_docs,
service_name, client,
sorted(paginator_model._paginator_config))
except DataNotFoundError:
pass
_assert_has_paginator_documentation(
generated_docs, service_name, client,
sorted(paginator_model._paginator_config)
)
except DataNotFoundError:
pass
# If the client has waiters, make sure the waiters are documented
if client.waiter_names:
waiter_model = botocore_session.get_waiter_model(service_name)
yield (_assert_has_waiter_documentation, generated_docs,
service_name, client, waiter_model)
# If the service has resources, make sure the service resource
# is at least documented.
if service_name in session.get_available_resources():
resource = boto3.resource(service_name, 'us-east-1')
yield (_assert_has_resource_documentation, generated_docs,
service_name, resource)
# If the client has waiters, make sure the waiters are documented.
if client.waiter_names:
waiter_model = botocore_session.get_waiter_model(service_name)
_assert_has_waiter_documentation(
generated_docs, service_name, client, waiter_model
)
def _assert_contains_lines_in_order(lines, contents):
for line in lines:
assert_true(line in contents)
assert line in contents
beginning = contents.find(line)
contents = contents[(beginning + len(line)):]

View File

@ -41,7 +41,7 @@ class TestStubberSupportsFilterExpressions(unittest.TestCase):
response = table.query(KeyConditionExpression=key_expr,
FilterExpression=filter_expr)
self.assertEqual(list(), response['Items'])
assert response['Items'] == []
stubber.assert_no_pending_responses()
def test_table_scan_can_be_stubbed_with_expressions(self):
@ -59,5 +59,5 @@ class TestStubberSupportsFilterExpressions(unittest.TestCase):
with stubber:
response = table.scan(FilterExpression=filter_expr)
self.assertEqual(list(), response['Items'])
assert response['Items'] == []
stubber.assert_no_pending_responses()

View File

@ -25,7 +25,7 @@ class TestTableResourceCustomizations(unittest.TestCase):
def test_resource_has_batch_writer_added(self):
table = self.resource.Table('mytable')
self.assertTrue(hasattr(table, 'batch_writer'))
assert hasattr(table, 'batch_writer')
def test_operation_without_output(self):
table = self.resource.Table('mytable')

View File

@ -25,9 +25,8 @@ class TestCollection(unittest.TestCase):
self.ec2_resource = self.session.resource('ec2')
def test_can_use_collection_methods(self):
self.assertIsInstance(
self.ec2_resource.instances.all(), ResourceCollection)
assert isinstance(self.ec2_resource.instances.all(), ResourceCollection)
def test_can_chain_methods(self):
self.assertIsInstance(
assert isinstance(
self.ec2_resource.instances.all().page_size(5), ResourceCollection)

View File

@ -43,13 +43,12 @@ class TestDynamoDB(unittest.TestCase):
table.scan(FilterExpression=Attr('mykey').eq('myvalue'))
request = self.make_request_mock.call_args_list[0][0][1]
request_params = json.loads(request['body'].decode('utf-8'))
self.assertEqual(
request_params,
{'TableName': 'MyTable',
'FilterExpression': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'mykey'},
'ExpressionAttributeValues': {':v0': {'S': 'myvalue'}}}
)
assert request_params == {
'TableName': 'MyTable',
'FilterExpression': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'mykey'},
'ExpressionAttributeValues': {':v0': {'S': 'myvalue'}}
}
def test_client(self):
dynamodb = self.session.client('dynamodb')
@ -62,10 +61,9 @@ class TestDynamoDB(unittest.TestCase):
)
request = self.make_request_mock.call_args_list[0][0][1]
request_params = json.loads(request['body'].decode('utf-8'))
self.assertEqual(
request_params,
{'TableName': 'MyTable',
'FilterExpression': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'mykey'},
'ExpressionAttributeValues': {':v0': {'S': 'myvalue'}}}
)
assert request_params == {
'TableName': 'MyTable',
'FilterExpression': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'mykey'},
'ExpressionAttributeValues': {':v0': {'S': 'myvalue'}}
}

View File

@ -23,8 +23,7 @@ class TestInstanceDeleteTags(unittest.TestCase):
self.instance_resource = self.service_resource.Instance('i-abc123')
def test_delete_tags_injected(self):
self.assertTrue(hasattr(self.instance_resource, 'delete_tags'),
'delete_tags was not injected onto Instance resource.')
assert hasattr(self.instance_resource, 'delete_tags')
def test_delete_tags(self):
stubber = Stubber(self.instance_resource.meta.client)
@ -32,7 +31,7 @@ class TestInstanceDeleteTags(unittest.TestCase):
stubber.activate()
response = self.instance_resource.delete_tags(Tags=[{'Key': 'foo'}])
stubber.assert_no_pending_responses()
self.assertEqual(response, {})
assert response == {}
stubber.deactivate()
def test_mutating_filters(self):

View File

@ -10,6 +10,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
import boto3
from boto3.exceptions import ResourceNotExistsError
@ -35,21 +37,19 @@ class TestResourceCustomization(unittest.TestCase):
self.botocore_session.register('creating-resource-class.s3',
self.add_new_method(name='my_method'))
resource = session.resource('s3')
self.assertTrue(hasattr(resource, 'my_method'))
self.assertEqual(resource.my_method('anything'), 'anything')
assert hasattr(resource, 'my_method')
assert resource.my_method('anything') == 'anything'
class TestSessionErrorMessages(unittest.TestCase):
def test_has_good_error_message_when_no_resource(self):
bad_resource_name = 'doesnotexist'
err_regex = (
'%s.*resource does not exist.' % bad_resource_name
)
with self.assertRaisesRegex(ResourceNotExistsError, err_regex):
err_regex = f'{bad_resource_name}.*resource does not exist.'
with pytest.raises(ResourceNotExistsError, match=err_regex):
boto3.resource(bad_resource_name)
class TestGetAvailableSubresources(unittest.TestCase):
def test_s3_available_subresources_exists(self):
s3 = boto3.resource('s3')
self.assertTrue(hasattr(s3, 'get_available_subresources'))
assert hasattr(s3, 'get_available_subresources')

View File

@ -10,6 +10,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from tests import unittest
import botocore
@ -26,36 +28,26 @@ class TestS3MethodInjection(unittest.TestCase):
def test_transfer_methods_injected_to_client(self):
session = boto3.session.Session(region_name='us-west-2')
client = session.client('s3')
self.assertTrue(hasattr(client, 'upload_file'),
'upload_file was not injected onto S3 client')
self.assertTrue(hasattr(client, 'download_file'),
'download_file was not injected onto S3 client')
self.assertTrue(hasattr(client, 'copy'),
'copy was not injected onto S3 client')
assert hasattr(client, 'upload_file')
assert hasattr(client, 'download_file')
assert hasattr(client, 'copy')
def test_bucket_resource_has_load_method(self):
session = boto3.session.Session(region_name='us-west-2')
bucket = session.resource('s3').Bucket('fakebucket')
self.assertTrue(hasattr(bucket, 'load'),
'load() was not injected onto S3 Bucket resource.')
assert hasattr(bucket, 'load')
def test_transfer_methods_injected_to_bucket(self):
bucket = boto3.resource('s3').Bucket('my_bucket')
self.assertTrue(hasattr(bucket, 'upload_file'),
'upload_file was not injected onto S3 bucket')
self.assertTrue(hasattr(bucket, 'download_file'),
'download_file was not injected onto S3 bucket')
self.assertTrue(hasattr(bucket, 'copy'),
'copy was not injected onto S3 bucket')
assert hasattr(bucket, 'upload_file')
assert hasattr(bucket, 'download_file')
assert hasattr(bucket, 'copy')
def test_transfer_methods_injected_to_object(self):
obj = boto3.resource('s3').Object('my_bucket', 'my_key')
self.assertTrue(hasattr(obj, 'upload_file'),
'upload_file was not injected onto S3 object')
self.assertTrue(hasattr(obj, 'download_file'),
'download_file was not injected onto S3 object')
self.assertTrue(hasattr(obj, 'copy'),
'copy was not injected onto S3 object')
assert hasattr(obj, 'upload_file')
assert hasattr(obj, 'download_file')
assert hasattr(obj, 'copy')
class BaseTransferTest(unittest.TestCase):
@ -208,7 +200,7 @@ class TestCopy(BaseTransferTest):
response = self.s3.meta.client.copy(
self.copy_source, self.bucket, self.key)
# The response will be none on a successful transfer.
self.assertIsNone(response)
assert response is None
def test_bucket_copy(self):
self.stub_single_part_copy()
@ -216,14 +208,14 @@ class TestCopy(BaseTransferTest):
with self.stubber:
response = bucket.copy(self.copy_source, self.key)
# The response will be none on a successful transfer.
self.assertIsNone(response)
assert response is None
def test_object_copy(self):
self.stub_single_part_copy()
obj = self.s3.Object(self.bucket, self.key)
with self.stubber:
response = obj.copy(self.copy_source)
self.assertIsNone(response)
assert response is None
def test_copy_progress(self):
chunksize = 8 * (1024 ** 2)
@ -243,8 +235,8 @@ class TestCopy(BaseTransferTest):
# Assert that the progress callback was called the correct number of
# times with the correct amounts.
self.assertEqual(self.progress_times_called, 3)
self.assertEqual(self.progress, chunksize * 3)
assert self.progress_times_called == 3
assert self.progress == chunksize * 3
class TestUploadFileobj(BaseTransferTest):
@ -310,7 +302,7 @@ class TestUploadFileobj(BaseTransferTest):
def test_raises_value_error_on_invalid_fileobj(self):
with self.stubber:
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
self.s3.meta.client.upload_fileobj(
Fileobj='foo', Bucket=self.bucket, Key=self.key)
@ -427,12 +419,12 @@ class TestDownloadFileobj(BaseTransferTest):
self.s3.meta.client.download_fileobj(
Bucket=self.bucket, Key=self.key, Fileobj=self.fileobj)
self.assertEqual(self.fileobj.getvalue(), self.contents)
assert self.fileobj.getvalue() == self.contents
self.stubber.assert_no_pending_responses()
def test_raises_value_error_on_invalid_fileobj(self):
with self.stubber:
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
self.s3.meta.client.download_fileobj(
Bucket=self.bucket, Key=self.key, Fileobj='foo')
@ -442,7 +434,7 @@ class TestDownloadFileobj(BaseTransferTest):
with self.stubber:
bucket.download_fileobj(Key=self.key, Fileobj=self.fileobj)
self.assertEqual(self.fileobj.getvalue(), self.contents)
assert self.fileobj.getvalue() == self.contents
self.stubber.assert_no_pending_responses()
def test_object_download(self):
@ -451,7 +443,7 @@ class TestDownloadFileobj(BaseTransferTest):
with self.stubber:
obj.download_fileobj(Fileobj=self.fileobj)
self.assertEqual(self.fileobj.getvalue(), self.contents)
assert self.fileobj.getvalue() == self.contents
self.stubber.assert_no_pending_responses()
def test_multipart_download(self):
@ -467,7 +459,7 @@ class TestDownloadFileobj(BaseTransferTest):
Bucket=self.bucket, Key=self.key, Fileobj=self.fileobj,
Config=transfer_config)
self.assertEqual(self.fileobj.getvalue(), self.contents)
assert self.fileobj.getvalue() == self.contents
self.stubber.assert_no_pending_responses()
def test_download_progress(self):
@ -489,8 +481,8 @@ class TestDownloadFileobj(BaseTransferTest):
# Assert that the progress callback was called the correct number of
# times with the correct amounts.
self.assertEqual(self.progress_times_called, 11)
self.assertEqual(self.progress, 55)
assert self.progress_times_called == 11
assert self.progress == 55
self.stubber.assert_no_pending_responses()
@ -520,19 +512,19 @@ class TestS3ObjectSummary(unittest.TestCase):
self.stubber.deactivate()
def test_has_load(self):
self.assertTrue(hasattr(self.obj_summary, 'load'),
'load() was not injected onto ObjectSummary resource.')
# Validate load was injected onto ObjectSummary.
assert hasattr(self.obj_summary, 'load')
def test_autoloads_correctly(self):
# In HeadObject the parameter returned is ContentLength, this
# should get mapped to Size of ListObject since the resource uses
# the shape returned to by ListObjects.
self.assertEqual(self.obj_summary.size, self.obj_summary_size)
assert self.obj_summary.size == self.obj_summary_size
def test_cannot_access_other_non_related_parameters(self):
# Even though an HeadObject was used to load this, it should
# only expose the attributes from its shape defined in ListObjects.
self.assertFalse(hasattr(self.obj_summary, 'content_length'))
assert not hasattr(self.obj_summary, 'content_length')
class TestServiceResource(unittest.TestCase):
@ -542,7 +534,5 @@ class TestServiceResource(unittest.TestCase):
def test_unsigned_signature_version_is_not_corrupted(self):
config = Config(signature_version=botocore.UNSIGNED)
resource = self.session.resource('s3', config=config)
self.assertIs(
resource.meta.client.meta.config.signature_version,
botocore.UNSIGNED
)
sig_version = resource.meta.client.meta.config.signature_version
assert sig_version is botocore.UNSIGNED

View File

@ -31,18 +31,18 @@ class TestSession(unittest.TestCase):
# Emit the event.
self.session.events.emit('myevent', my_list=initial_list)
# Ensure that the registered handler was called.
self.assertEqual(initial_list, ['my_handler called'])
assert initial_list == ['my_handler called']
def test_can_access_region_property(self):
session = boto3.session.Session(region_name='us-west-1')
self.assertEqual(session.region_name, 'us-west-1')
assert session.region_name == 'us-west-1'
def test_get_available_partitions(self):
partitions = self.session.get_available_partitions()
self.assertIsInstance(partitions, list)
self.assertTrue(partitions)
assert isinstance(partitions, list)
assert partitions
def test_get_available_regions(self):
regions = self.session.get_available_regions('s3')
self.assertIsInstance(regions, list)
self.assertTrue(regions)
assert isinstance(regions, list)
assert regions

View File

@ -10,63 +10,76 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from nose.tools import assert_true
import pytest
from boto3.session import Session
import botocore.session
boto3_session = None
def create_session():
session = Session(aws_access_key_id='dummy',
aws_secret_access_key='dummy',
region_name='us-east-1')
return session
global boto3_session
if boto3_session is None:
boto3_session = Session(
aws_access_key_id='dummy',
aws_secret_access_key='dummy',
region_name='us-east-1'
)
return boto3_session
def test_can_create_all_resources():
"""Verify we can create all existing resources."""
def _all_resources():
session = create_session()
for service_name in session.get_available_resources():
yield _test_create_resource, session, service_name
yield session, service_name
def _test_create_resource(session, service_name):
resource = session.resource(service_name)
# Verifying we have a "meta" attr is just an arbitrary
# sanity check.
assert_true(hasattr(resource, 'meta'))
def test_can_create_all_clients():
def _all_clients():
session = create_session()
for service_name in session.get_available_services():
yield _test_create_client, session, service_name
yield session, service_name
def _test_create_client(session, service_name):
client = session.client(service_name)
assert_true(hasattr(client, 'meta'))
def test_api_versions_synced_with_botocore():
def _all_api_version_args():
botocore_session = botocore.session.get_session()
boto3_session = create_session()
for service_name in boto3_session.get_available_resources():
yield (_assert_same_api_versions, service_name,
botocore_session, boto3_session)
yield (service_name, botocore_session, boto3_session)
def _assert_same_api_versions(service_name, botocore_session, boto3_session):
@pytest.mark.parametrize('resource_args', _all_resources())
def test_can_create_all_resources(resource_args):
"""Verify we can create all existing resources."""
session, service_name = resource_args
resource = session.resource(service_name)
# Verifying we have a "meta" attr is just an arbitrary
# sanity check.
assert hasattr(resource, 'meta')
@pytest.mark.parametrize('client_args', _all_clients())
def test_can_create_all_clients(client_args):
"""Verify we can create all existing clients."""
session, service_name = client_args
client = session.client(service_name)
assert hasattr(client, 'meta')
@pytest.mark.parametrize('api_version_args', _all_api_version_args())
def test_api_versions_synced_with_botocore(api_version_args):
"""Verify both boto3 and botocore clients stay in sync."""
service_name, botocore_session, boto3_session = api_version_args
resource = boto3_session.resource(service_name)
boto3_api_version = resource.meta.client.meta.service_model.api_version
client = botocore_session.create_client(service_name,
region_name='us-east-1',
aws_access_key_id='foo',
aws_secret_access_key='bar')
client = botocore_session.create_client(
service_name,
region_name='us-east-1',
aws_access_key_id='foo',
aws_secret_access_key='bar'
)
botocore_api_version = client.meta.service_model.api_version
if botocore_api_version != boto3_api_version:
raise AssertionError(
"Different latest API versions found for %s: "
"%s (botocore), %s (boto3)\n" % (service_name,
botocore_api_version,
boto3_api_version))
err = (
f"Different latest API versions found for {service_name}: "
f"{botocore_api_version} (botocore), {boto3_api_version} (boto3)\n"
)
assert botocore_api_version == boto3_api_version, err

View File

@ -10,11 +10,12 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from tests import unittest
import botocore.session
from boto3 import utils
import boto3.session
@ -30,7 +31,7 @@ class TestUtils(unittest.TestCase):
botocore_session.register('creating-client-class', shadows_put_object)
with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
# This should raise an exception because we're trying to
# shadow the put_object client method in the
# shadows_put_object handler above.

View File

@ -11,9 +11,11 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import boto3.session
import pytest
import boto3.session
from boto3.resources.collection import CollectionManager
from boto3.resources.base import ServiceResource
# A map of services to regions that cannot use us-west-2
@ -24,17 +26,16 @@ REGION_MAP = {
# A list of collections to ignore. They require parameters
# or are very slow to run.
BLACKLIST = {
BLOCKLIST = {
'ec2': ['images'],
'iam': ['signing_certificates'],
'sqs': ['dead_letter_source_queues']
}
def test_all_collections():
# This generator yields test functions for every collection
# on every available resource, except those which have
# been blacklisted.
def all_collections():
# This generator yields every collection on every available resource,
# except those which have been blocklisted.
session = boto3.session.Session()
for service_name in session.get_available_resources():
resource = session.resource(
@ -42,15 +43,20 @@ def test_all_collections():
region_name=REGION_MAP.get(service_name, 'us-west-2'))
for key in dir(resource):
if key in BLACKLIST.get(service_name, []):
if key in BLOCKLIST.get(service_name, []):
continue
value = getattr(resource, key)
if isinstance(value, CollectionManager):
yield _test_collection, service_name, key, value
yield value
def _test_collection(service_name, collection_name, collection):
@pytest.mark.parametrize("collection", all_collections())
def test_all_collections(collection):
"""Test all collections work on every available resource."""
# Create a list of the first page of items. This tests that
# a remote request can be made, the response parsed, and that
# resources are successfully created.
list(collection.limit(1))
collection_list = list(collection.limit(1))
assert len(collection_list) < 2
assert all([isinstance(res, ServiceResource) for res in collection_list])

View File

@ -290,11 +290,11 @@ class BaseDocsTest(unittest.TestCase):
if contents is None:
contents = self.doc_structure.flush_structure().decode('utf-8')
for line in lines:
self.assertIn(line, contents)
assert line in contents
beginning = contents.find(line)
contents = contents[(beginning + len(line)):]
def assert_not_contains_lines(self, lines):
contents = self.doc_structure.flush_structure().decode('utf-8')
for line in lines:
self.assertNotIn(line, contents)
assert line not in contents

View File

@ -98,7 +98,7 @@ class TestServiceDocumenter(BaseDocsTest):
os.remove(self.resource_model_file)
service_documenter = ServiceDocumenter('myservice', self.session)
contents = service_documenter.document_service().decode('utf-8')
self.assertNotIn('Service Resource', contents)
assert 'Service Resource' not in contents
def test_document_service_no_paginators(self):
# Delete the resource model so that the resource is not documented
@ -107,7 +107,7 @@ class TestServiceDocumenter(BaseDocsTest):
os.remove(self.paginator_model_file)
service_documenter = ServiceDocumenter('myservice', self.session)
contents = service_documenter.document_service().decode('utf-8')
self.assertNotIn('Paginators', contents)
assert 'Paginators' not in contents
def test_document_service_no_waiter(self):
# Delete the resource model so that the resource is not documented
@ -116,7 +116,7 @@ class TestServiceDocumenter(BaseDocsTest):
os.remove(self.waiter_model_file)
service_documenter = ServiceDocumenter('myservice', self.session)
contents = service_documenter.document_service().decode('utf-8')
self.assertNotIn('Waiters', contents)
assert 'Waiters' not in contents
def test_creates_correct_path_to_examples_based_on_service_name(self):
path = os.sep.join([os.path.dirname(boto3.__file__),
@ -126,9 +126,7 @@ class TestServiceDocumenter(BaseDocsTest):
isfile.return_value = False
s = ServiceDocumenter('myservice', self.session)
s.document_service()
self.assertEqual(
isfile.call_args_list[-1],
mock.call(path))
assert isfile.call_args_list[-1] == mock.call(path)
def test_injects_examples_when_found(self):
examples_path = os.sep.join([os.path.dirname(__file__), '..', 'data',
@ -137,5 +135,5 @@ class TestServiceDocumenter(BaseDocsTest):
'myservice', self.session)
service_documenter.EXAMPLE_PATH = examples_path
contents = service_documenter.document_service().decode('utf-8')
self.assertIn('This is an example', contents)
self.assertNotIn('This is for another service', contents)
assert 'This is an example' in contents
assert 'This is for another service' not in contents

View File

@ -20,23 +20,23 @@ class TestGetResourceIgnoreParams(unittest.TestCase):
def test_target_is_single_resource(self):
param = Parameter('InstanceId', 'response')
ignore_params = get_resource_ignore_params([param])
self.assertEqual(ignore_params, ['InstanceId'])
assert ignore_params == ['InstanceId']
def test_target_is_multiple_resources(self):
param = Parameter('InstanceIds[]', 'response')
ignore_params = get_resource_ignore_params([param])
self.assertEqual(ignore_params, ['InstanceIds'])
assert ignore_params == ['InstanceIds']
def test_target_is_element_of_multiple_resources(self):
param = Parameter('InstanceIds[0]', 'response')
ignore_params = get_resource_ignore_params([param])
self.assertEqual(ignore_params, ['InstanceIds'])
assert ignore_params == ['InstanceIds']
def test_target_is_nested_param(self):
param = Parameter('Filters[0].Name', 'response')
ignore_params = get_resource_ignore_params([param])
self.assertEqual(ignore_params, ['Filters'])
assert ignore_params == ['Filters']
param = Parameter('Filters[0].Values[0]', 'response')
ignore_params = get_resource_ignore_params([param])
self.assertEqual(ignore_params, ['Filters'])
assert ignore_params == ['Filters']

View File

@ -12,19 +12,20 @@
# language governing permissions and limitations under the License.
import copy
import pytest
from tests import unittest
from boto3.exceptions import DynamoDBOperationNotSupportedError
from boto3.exceptions import DynamoDBNeedsConditionError
from boto3.exceptions import DynamoDBNeedsKeyConditionError
from boto3.dynamodb.conditions import Attr, Key
from boto3.dynamodb.conditions import And, Or, Not, Equals, LessThan
from boto3.dynamodb.conditions import LessThanEquals, GreaterThan
from boto3.dynamodb.conditions import GreaterThanEquals, BeginsWith, Between
from boto3.dynamodb.conditions import NotEquals, In, AttributeExists
from boto3.dynamodb.conditions import AttributeNotExists, Contains, Size
from boto3.dynamodb.conditions import AttributeType
from boto3.dynamodb.conditions import ConditionExpressionBuilder
from boto3.exceptions import (
DynamoDBOperationNotSupportedError, DynamoDBNeedsConditionError,
DynamoDBNeedsKeyConditionError,
)
from boto3.dynamodb.conditions import (
Attr, Key, And, Or, Not, Equals, LessThan,
LessThanEquals, GreaterThan, GreaterThanEquals, BeginsWith, Between,
NotEquals, In, AttributeExists, AttributeNotExists, Contains, Size,
AttributeType, ConditionExpressionBuilder
)
class TestK(unittest.TestCase):
@ -35,100 +36,92 @@ class TestK(unittest.TestCase):
self.value2 = 'foo2'
def test_and(self):
with self.assertRaisesRegex(
DynamoDBOperationNotSupportedError, 'AND'):
with pytest.raises(DynamoDBOperationNotSupportedError, match=r'AND'):
self.attr & self.attr2
def test_or(self):
with self.assertRaisesRegex(
DynamoDBOperationNotSupportedError, 'OR'):
with pytest.raises(DynamoDBOperationNotSupportedError, match=r'OR'):
self.attr | self.attr2
def test_not(self):
with self.assertRaisesRegex(
DynamoDBOperationNotSupportedError, 'NOT'):
with pytest.raises(DynamoDBOperationNotSupportedError, match=r'NOT'):
~self.attr
def test_eq(self):
self.assertEqual(
self.attr.eq(self.value), Equals(self.attr, self.value))
assert self.attr.eq(self.value) == Equals(self.attr, self.value)
def test_lt(self):
self.assertEqual(
self.attr.lt(self.value), LessThan(self.attr, self.value))
assert self.attr.lt(self.value) == LessThan(self.attr, self.value)
def test_lte(self):
self.assertEqual(
self.attr.lte(self.value), LessThanEquals(self.attr, self.value))
assert self.attr.lte(self.value) == LessThanEquals(
self.attr, self.value)
def test_gt(self):
self.assertEqual(
self.attr.gt(self.value), GreaterThan(self.attr, self.value))
assert self.attr.gt(self.value) == GreaterThan(self.attr, self.value)
def test_gte(self):
self.assertEqual(
self.attr.gte(self.value),
GreaterThanEquals(self.attr, self.value))
assert self.attr.gte(self.value) == GreaterThanEquals(
self.attr, self.value)
def test_begins_with(self):
self.assertEqual(self.attr.begins_with(self.value),
BeginsWith(self.attr, self.value))
assert self.attr.begins_with(self.value) == BeginsWith(
self.attr, self.value)
def test_between(self):
self.assertEqual(self.attr.between(self.value, self.value2),
Between(self.attr, self.value, self.value2))
assert self.attr.between(self.value, self.value2) == Between(
self.attr, self.value, self.value2)
def test_attribute_equality(self):
attr_copy = copy.deepcopy(self.attr)
self.assertIsNot(self.attr, attr_copy)
self.assertEqual(self.attr, attr_copy)
assert self.attr is not attr_copy
assert self.attr == attr_copy
def test_eq_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.eq(self.value)
comp2 = attr_copy.eq(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
def test_eq_inequality(self):
attr_copy = copy.deepcopy(self.attr)
self.assertNotEqual(self.attr.eq(self.value),
attr_copy.eq(self.value2))
assert self.attr.eq(self.value) != attr_copy.eq(self.value2)
def test_lt_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.lt(self.value)
comp2 = attr_copy.lt(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
def test_lte_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.lte(self.value)
comp2 = attr_copy.lte(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
def test_gt_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.gt(self.value)
comp2 = attr_copy.gt(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
def test_gte_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.gte(self.value)
comp2 = attr_copy.gte(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
def test_begins_with_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.begins_with(self.value)
comp2 = attr_copy.begins_with(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
def test_between_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.between(self.value, self.value2)
comp2 = attr_copy.between(self.value, self.value2)
self.assertEqual(comp, comp2)
assert comp == comp2
class TestA(TestK):
@ -139,71 +132,68 @@ class TestA(TestK):
self.value2 = 'foo2'
def test_ne(self):
self.assertEqual(self.attr.ne(self.value),
NotEquals(self.attr, self.value))
assert self.attr.ne(self.value) == NotEquals(self.attr, self.value)
def test_is_in(self):
self.assertEqual(self.attr.is_in([self.value]),
In(self.attr, [self.value]))
assert self.attr.is_in([self.value]) == In(self.attr, [self.value])
def test_exists(self):
self.assertEqual(self.attr.exists(), AttributeExists(self.attr))
assert self.attr.exists() == AttributeExists(self.attr)
def test_not_exists(self):
self.assertEqual(self.attr.not_exists(), AttributeNotExists(self.attr))
assert self.attr.not_exists() == AttributeNotExists(self.attr)
def test_contains(self):
self.assertEqual(self.attr.contains(self.value),
Contains(self.attr, self.value))
assert self.attr.contains(self.value) == Contains(self.attr, self.value)
def test_size(self):
self.assertEqual(self.attr.size(), Size(self.attr))
assert self.attr.size() == Size(self.attr)
def test_attribute_type(self):
self.assertEqual(self.attr.attribute_type(self.value),
AttributeType(self.attr, self.value))
assert self.attr.attribute_type(self.value) == AttributeType(
self.attr, self.value)
def test_ne_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.ne(self.value)
comp2 = attr_copy.ne(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
def test_is_in_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.is_in([self.value])
comp2 = attr_copy.is_in([self.value])
self.assertEqual(comp, comp2)
assert comp == comp2
def test_exists_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.exists()
comp2 = attr_copy.exists()
self.assertEqual(comp, comp2)
assert comp == comp2
def test_not_exists_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.not_exists()
comp2 = attr_copy.not_exists()
self.assertEqual(comp, comp2)
assert comp == comp2
def test_contains_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.contains(self.value)
comp2 = attr_copy.contains(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
def test_size_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.size()
comp2 = attr_copy.size()
self.assertEqual(comp, comp2)
assert comp == comp2
def test_attribute_type_equality(self):
attr_copy = copy.deepcopy(self.attr)
comp = self.attr.attribute_type(self.value)
comp2 = attr_copy.attribute_type(self.value)
self.assertEqual(comp, comp2)
assert comp == comp2
class TestConditions(unittest.TestCase):
@ -214,53 +204,51 @@ class TestConditions(unittest.TestCase):
def build_and_assert_expression(self, condition,
reference_expression_dict):
expression_dict = condition.get_expression()
self.assertDictEqual(expression_dict, reference_expression_dict)
assert expression_dict == reference_expression_dict
def test_equal_operator(self):
cond1 = Equals(self.value, self.value2)
cond2 = Equals(self.value, self.value2)
self.assertTrue(cond1 == cond2)
assert cond1 == cond2
def test_equal_operator_type(self):
cond1 = Equals(self.value, self.value2)
cond2 = NotEquals(self.value, self.value2)
self.assertFalse(cond1 == cond2)
assert cond1 != cond2
def test_equal_operator_value(self):
cond1 = Equals(self.value, self.value2)
cond2 = Equals(self.value, self.value)
self.assertFalse(cond1 == cond2)
assert cond1 != cond2
def test_not_equal_operator(self):
cond1 = Equals(self.value, self.value2)
cond2 = NotEquals(self.value, self.value)
self.assertTrue(cond1 != cond2)
assert cond1 != cond2
def test_and_operator(self):
cond1 = Equals(self.value, self.value2)
cond2 = Equals(self.value, self.value2)
self.assertEqual(cond1 & cond2, And(cond1, cond2))
assert cond1 & cond2 == And(cond1, cond2)
def test_and_operator_throws_excepetion(self):
cond1 = Equals(self.value, self.value2)
with self.assertRaisesRegex(
DynamoDBOperationNotSupportedError, 'AND'):
with pytest.raises(DynamoDBOperationNotSupportedError, match=r'AND'):
cond1 & self.value2
def test_or_operator(self):
cond1 = Equals(self.value, self.value2)
cond2 = Equals(self.value, self.value2)
self.assertEqual(cond1 | cond2, Or(cond1, cond2))
assert cond1 | cond2 == Or(cond1, cond2)
def test_or_operator_throws_excepetion(self):
cond1 = Equals(self.value, self.value2)
with self.assertRaisesRegex(
DynamoDBOperationNotSupportedError, 'OR'):
with pytest.raises(DynamoDBOperationNotSupportedError, match=r'OR'):
cond1 | self.value2
def test_not_operator(self):
cond1 = Equals(self.value, self.value2)
self.assertEqual(~cond1, Not(cond1))
assert ~cond1 == Not(cond1)
def test_eq(self):
self.build_and_assert_expression(
@ -304,7 +292,7 @@ class TestConditions(unittest.TestCase):
cond,
{'format': '{0} {operator} {1}',
'operator': 'IN', 'values': (self.value, (self.value2))})
self.assertTrue(cond.has_grouped_values)
assert cond.has_grouped_values
def test_bet(self):
self.build_and_assert_expression(
@ -401,13 +389,13 @@ class TestConditionExpressionBuilder(unittest.TestCase):
is_key_condition=False):
exp_string, names, values = self.builder.build_expression(
condition, is_key_condition=is_key_condition)
self.assertEqual(exp_string, ref_string)
self.assertEqual(names, ref_names)
self.assertEqual(values, ref_values)
assert exp_string == ref_string
assert names == ref_names
assert values == ref_values
def test_bad_input(self):
a = Attr('myattr')
with self.assertRaises(DynamoDBNeedsConditionError):
with pytest.raises(DynamoDBNeedsConditionError):
self.builder.build_expression(a)
def test_build_expression_eq(self):
@ -537,7 +525,7 @@ class TestConditionExpressionBuilder(unittest.TestCase):
def test_build_with_is_key_condition_throws_error(self):
a = Attr('myattr')
with self.assertRaises(DynamoDBNeedsKeyConditionError):
with pytest.raises(DynamoDBNeedsKeyConditionError):
self.builder.build_expression(a.eq('foo'), is_key_condition=True)
def test_build_attr_map(self):

View File

@ -28,17 +28,16 @@ class BaseTransformationTest(unittest.TestCase):
self.flush_amount)
def assert_batch_write_calls_are(self, expected_batch_writes):
self.assertEqual(self.client.batch_write_item.call_count,
len(expected_batch_writes))
assert self.client.batch_write_item.call_count == len(expected_batch_writes)
batch_write_calls = [
args[1] for args in
self.client.batch_write_item.call_args_list
]
self.assertEqual(batch_write_calls, expected_batch_writes)
assert batch_write_calls == expected_batch_writes
def test_batch_write_does_not_immediately_write(self):
self.batch_writer.put_item(Item={'Hash': 'foo'})
self.assertFalse(self.client.batch_write_item.called)
assert not self.client.batch_write_item.called
def test_batch_write_flushes_at_flush_amount(self):
self.batch_writer.put_item(Item={'Hash': 'foo1'})

View File

@ -100,12 +100,10 @@ class TestInputOutputTransformer(BaseTransformationTest):
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation,
target_shape=self.target_shape)
self.assertEqual(
input_params,
{'Structure': {
assert input_params == {
'Structure': {
'TransformMe': self.transformed_value,
'LeaveAlone': self.original_value}}
)
def test_transform_map(self):
input_params = {
@ -136,11 +134,10 @@ class TestInputOutputTransformer(BaseTransformationTest):
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation,
target_shape=self.target_shape)
self.assertEqual(
input_params,
{'TransformMe': {'foo': self.transformed_value},
'LeaveAlone': {'foo': self.original_value}}
)
assert input_params == {
'TransformMe':
{'foo': self.transformed_value},
'LeaveAlone': {'foo': self.original_value}}
def test_transform_list(self):
input_params = {
@ -172,11 +169,9 @@ class TestInputOutputTransformer(BaseTransformationTest):
self.transformer.transform(
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation, target_shape=self.target_shape)
self.assertEqual(
input_params,
{'TransformMe': [self.transformed_value, self.transformed_value],
'LeaveAlone': [self.original_value, self.original_value]}
)
assert input_params == {
'TransformMe': [self.transformed_value, self.transformed_value],
'LeaveAlone': [self.original_value, self.original_value]}
def test_transform_nested_structure(self):
input_params = {
@ -210,12 +205,10 @@ class TestInputOutputTransformer(BaseTransformationTest):
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation,
target_shape=self.target_shape)
self.assertEqual(
input_params,
{'WrapperStructure': {
assert input_params == {
'WrapperStructure': {
'Structure': {'TransformMe': self.transformed_value,
'LeaveAlone': self.original_value}}}
)
def test_transform_nested_map(self):
input_params = {
@ -271,11 +264,9 @@ class TestInputOutputTransformer(BaseTransformationTest):
self.transformer.transform(
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation, target_shape=self.target_shape)
self.assertEqual(
input_params,
{'TargetedWrapperMap': {'foo': {'bar': self.transformed_value}},
'UntargetedWrapperMap': {'foo': {'bar': self.original_value}}}
)
assert input_params == {
'TargetedWrapperMap': {'foo': {'bar': self.transformed_value}},
'UntargetedWrapperMap': {'foo': {'bar': self.original_value}}}
def test_transform_nested_list(self):
input_params = {
@ -323,13 +314,11 @@ class TestInputOutputTransformer(BaseTransformationTest):
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation,
target_shape=self.target_shape)
self.assertEqual(
input_params,
{'TargetedWrapperList': [[
assert input_params == {
'TargetedWrapperList': [[
self.transformed_value, self.transformed_value]],
'UntargetedWrapperList': [[
'UntargetedWrapperList': [[
self.original_value, self.original_value]]}
)
def test_transform_incorrect_type_for_structure(self):
input_params = {
@ -351,7 +340,7 @@ class TestInputOutputTransformer(BaseTransformationTest):
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation,
target_shape=self.target_shape)
self.assertEqual(input_params, {'Structure': 'foo'})
assert input_params == {'Structure': 'foo'}
def test_transform_incorrect_type_for_map(self):
input_params = {
@ -372,7 +361,7 @@ class TestInputOutputTransformer(BaseTransformationTest):
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation,
target_shape=self.target_shape)
self.assertEqual(input_params, {'Map': 'foo'})
assert input_params == {'Map': 'foo'}
def test_transform_incorrect_type_for_list(self):
input_params = {
@ -391,7 +380,7 @@ class TestInputOutputTransformer(BaseTransformationTest):
self.transformer.transform(
params=input_params, model=self.operation_model.input_shape,
transformation=self.transformation, target_shape=self.target_shape)
self.assertEqual(input_params, {'List': 'foo'})
assert input_params == {'List': 'foo'}
class BaseTransformAttributeValueTest(BaseTransformationTest):
@ -427,12 +416,10 @@ class TestTransformAttributeValueInput(BaseTransformAttributeValueTest):
self.injector.inject_attribute_value_input(
params=input_params, model=self.operation_model)
self.assertEqual(
input_params,
{'Structure': {
assert input_params == {
'Structure': {
'TransformMe': self.dynamodb_value,
'LeaveAlone': 'unchanged'}}
)
class TestTransformAttributeValueOutput(BaseTransformAttributeValueTest):
@ -456,13 +443,10 @@ class TestTransformAttributeValueOutput(BaseTransformAttributeValueTest):
self.add_input_shape(input_shape)
self.injector.inject_attribute_value_output(
parsed=parsed, model=self.operation_model)
self.assertEqual(
parsed,
{'Structure': {
assert parsed == {
'Structure': {
'TransformMe': self.python_value,
'LeaveAlone': 'unchanged'}}
)
def test_no_output(self):
service_model = ServiceModel({
@ -487,8 +471,7 @@ class TestTransformAttributeValueOutput(BaseTransformAttributeValueTest):
parsed = {}
self.injector.inject_attribute_value_output(
parsed=parsed, model=operation_model)
self.assertEqual(parsed, {})
assert parsed == {}
class TestTransformConditionExpression(BaseTransformationTest):
@ -511,8 +494,7 @@ class TestTransformConditionExpression(BaseTransformationTest):
}
self.injector.inject_condition_expressions(
params, self.operation_model)
self.assertEqual(
params, {'KeyCondition': 'foo', 'AttrCondition': 'bar'})
assert params == {'KeyCondition': 'foo', 'AttrCondition': 'bar'}
def test_single_attr_condition_expression(self):
params = {
@ -520,12 +502,10 @@ class TestTransformConditionExpression(BaseTransformationTest):
}
self.injector.inject_condition_expressions(
params, self.operation_model)
self.assertEqual(
params,
{'AttrCondition': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'foo'},
'ExpressionAttributeValues': {':v0': 'bar'}}
)
assert params == {
'AttrCondition': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'foo'},
'ExpressionAttributeValues': {':v0': 'bar'}}
def test_single_key_conditon_expression(self):
params = {
@ -533,12 +513,10 @@ class TestTransformConditionExpression(BaseTransformationTest):
}
self.injector.inject_condition_expressions(
params, self.operation_model)
self.assertEqual(
params,
{'KeyCondition': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'foo'},
'ExpressionAttributeValues': {':v0': 'bar'}}
)
assert params == {
'KeyCondition': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'foo'},
'ExpressionAttributeValues': {':v0': 'bar'}}
def test_key_and_attr_conditon_expression(self):
params = {
@ -547,13 +525,11 @@ class TestTransformConditionExpression(BaseTransformationTest):
}
self.injector.inject_condition_expressions(
params, self.operation_model)
self.assertEqual(
params,
{'KeyCondition': '#n1 = :v1',
'AttrCondition': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'biz', '#n1': 'foo'},
'ExpressionAttributeValues': {':v0': 'baz', ':v1': 'bar'}}
)
assert params == {
'KeyCondition': '#n1 = :v1',
'AttrCondition': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'biz', '#n1': 'foo'},
'ExpressionAttributeValues': {':v0': 'baz', ':v1': 'bar'}}
def test_key_and_attr_conditon_expression_with_placeholders(self):
params = {
@ -564,23 +540,21 @@ class TestTransformConditionExpression(BaseTransformationTest):
}
self.injector.inject_condition_expressions(
params, self.operation_model)
self.assertEqual(
params,
{'KeyCondition': '#n1 = :v1',
'AttrCondition': '#n0 = :v0',
'ExpressionAttributeNames': {
'#n0': 'biz', '#n1': 'foo', '#a': 'b'},
'ExpressionAttributeValues': {
':v0': 'baz', ':v1': 'bar', ':c': 'd'}}
)
assert params == {
'KeyCondition': '#n1 = :v1',
'AttrCondition': '#n0 = :v0',
'ExpressionAttributeNames': {
'#n0': 'biz', '#n1': 'foo', '#a': 'b'},
'ExpressionAttributeValues': {
':v0': 'baz', ':v1': 'bar', ':c': 'd'}}
class TestCopyDynamoDBParams(unittest.TestCase):
def test_copy_dynamodb_params(self):
params = {'foo': 'bar'}
new_params = copy_dynamodb_params(params)
self.assertEqual(params, new_params)
self.assertIsNot(new_params, params)
assert params == new_params
assert new_params is not params
class TestDynamoDBHighLevelResource(unittest.TestCase):
@ -605,37 +579,35 @@ class TestDynamoDBHighLevelResource(unittest.TestCase):
# It should have fired the following events upon instantiation.
event_call_args = self.events.register.call_args_list
self.assertEqual(
event_call_args,
[mock.call(
assert event_call_args == [
mock.call(
'provide-client-params.dynamodb',
copy_dynamodb_params,
unique_id='dynamodb-create-params-copy'),
mock.call(
mock.call(
'before-parameter-build.dynamodb',
mock_injector.return_value.inject_condition_expressions,
unique_id='dynamodb-condition-expression'),
mock.call(
mock.call(
'before-parameter-build.dynamodb',
mock_injector.return_value.inject_attribute_value_input,
unique_id='dynamodb-attr-value-input'),
mock.call(
mock.call(
'after-call.dynamodb',
mock_injector.return_value.inject_attribute_value_output,
unique_id='dynamodb-attr-value-output'),
mock.call(
mock.call(
'docs.*.dynamodb.*.complete-section',
mock_modify_documentation_method,
unique_id='dynamodb-attr-value-docs'),
mock.call(
mock.call(
'docs.*.dynamodb.*.complete-section',
mock_modify_documentation_method,
unique_id='dynamodb-key-expression-docs'),
mock.call(
mock.call(
'docs.*.dynamodb.*.complete-section',
mock_modify_documentation_method,
unique_id='dynamodb-cond-expression-docs')]
)
class TestRegisterHighLevelInterface(unittest.TestCase):
@ -644,4 +616,4 @@ class TestRegisterHighLevelInterface(unittest.TestCase):
register_high_level_interface(base_classes)
# Check that the base classes are as expected
self.assertEqual(base_classes, [DynamoDBHighLevelResource, object])
assert base_classes == [DynamoDBHighLevelResource, object]

View File

@ -11,6 +11,9 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from decimal import Decimal
import pytest
from tests import unittest
from botocore.compat import six
@ -21,39 +24,39 @@ from boto3.dynamodb.types import Binary, TypeSerializer, TypeDeserializer
class TestBinary(unittest.TestCase):
def test_bytes_input(self):
data = Binary(b'\x01')
self.assertEqual(b'\x01', data)
self.assertEqual(b'\x01', data.value)
assert b'\x01' == data
assert b'\x01' == data.value
def test_non_ascii_bytes_input(self):
# Binary data that is out of ASCII range
data = Binary(b'\x88')
self.assertEqual(b'\x88', data)
self.assertEqual(b'\x88', data.value)
assert b'\x88' == data
assert b'\x88' == data.value
def test_bytearray_input(self):
data = Binary(bytearray([1]))
self.assertEqual(b'\x01', data)
self.assertEqual(b'\x01', data.value)
assert b'\x01' == data
assert b'\x01' == data.value
def test_unicode_throws_error(self):
with self.assertRaises(TypeError):
with pytest.raises(TypeError):
Binary(u'\u00e9')
def test_integer_throws_error(self):
with self.assertRaises(TypeError):
with pytest.raises(TypeError):
Binary(1)
def test_not_equal(self):
self.assertTrue(Binary(b'\x01') != b'\x02')
assert Binary(b'\x01') != b'\x02'
def test_str(self):
self.assertEqual(Binary(b'\x01').__str__(), b'\x01')
assert Binary(b'\x01').__str__() == b'\x01'
def test_bytes(self):
self.assertEqual(bytes(Binary(b'\x01')), b'\x01')
def test_repr(self):
self.assertIn('Binary', repr(Binary(b'1')))
assert 'Binary' in repr(Binary(b'1'))
class TestSerializer(unittest.TestCase):
@ -61,73 +64,67 @@ class TestSerializer(unittest.TestCase):
self.serializer = TypeSerializer()
def test_serialize_unsupported_type(self):
with self.assertRaisesRegex(TypeError, 'Unsupported type'):
with pytest.raises(TypeError, match=r'Unsupported type'):
self.serializer.serialize(object())
def test_serialize_null(self):
self.assertEqual(self.serializer.serialize(None), {'NULL': True})
assert self.serializer.serialize(None) == {'NULL': True}
def test_serialize_boolean(self):
self.assertEqual(self.serializer.serialize(False), {'BOOL': False})
assert self.serializer.serialize(False) == {'BOOL': False}
def test_serialize_integer(self):
self.assertEqual(self.serializer.serialize(1), {'N': '1'})
assert self.serializer.serialize(1) == {'N': '1'}
def test_serialize_decimal(self):
self.assertEqual(
self.serializer.serialize(Decimal('1.25')), {'N': '1.25'})
assert self.serializer.serialize(Decimal('1.25')) == {'N': '1.25'}
def test_serialize_float_error(self):
with self.assertRaisesRegex(
TypeError,
'Float types are not supported. Use Decimal types instead'):
error_msg = r'Float types are not supported. Use Decimal types instead'
with pytest.raises(TypeError, match=error_msg):
self.serializer.serialize(1.25)
def test_serialize_NaN_error(self):
with self.assertRaisesRegex(
TypeError,
'Infinity and NaN not supported'):
with pytest.raises(TypeError, match=r'Infinity and NaN not supported'):
self.serializer.serialize(Decimal('NaN'))
def test_serialize_string(self):
self.assertEqual(self.serializer.serialize('foo'), {'S': 'foo'})
assert self.serializer.serialize('foo') == {'S': 'foo'}
def test_serialize_binary(self):
self.assertEqual(self.serializer.serialize(
Binary(b'\x01')), {'B': b'\x01'})
assert self.serializer.serialize(Binary(b'\x01')) == {'B': b'\x01'}
def test_serialize_bytearray(self):
self.assertEqual(self.serializer.serialize(bytearray([1])),
{'B': b'\x01'})
assert self.serializer.serialize(bytearray([1])) == {'B': b'\x01'}
@unittest.skipIf(six.PY2,
'This is a test when using python3 version of bytes')
@pytest.mark.skipif(six.PY2,
reason='This is a test when using python3 version of bytes')
def test_serialize_bytes(self):
self.assertEqual(self.serializer.serialize(b'\x01'), {'B': b'\x01'})
assert self.serializer.serialize(b'\x01') == {'B': b'\x01'}
def test_serialize_number_set(self):
serialized_value = self.serializer.serialize(set([1, 2, 3]))
self.assertEqual(len(serialized_value), 1)
self.assertIn('NS', serialized_value)
assert len(serialized_value) == 1
assert 'NS' in serialized_value
self.assertCountEqual(serialized_value['NS'], ['1', '2', '3'])
def test_serialize_string_set(self):
serialized_value = self.serializer.serialize(set(['foo', 'bar']))
self.assertEqual(len(serialized_value), 1)
self.assertIn('SS', serialized_value)
assert len(serialized_value) == 1
assert 'SS' in serialized_value
self.assertCountEqual(serialized_value['SS'], ['foo', 'bar'])
def test_serialize_binary_set(self):
serialized_value = self.serializer.serialize(
set([Binary(b'\x01'), Binary(b'\x02')]))
self.assertEqual(len(serialized_value), 1)
self.assertIn('BS', serialized_value)
assert len(serialized_value) == 1
assert 'BS' in serialized_value
self.assertCountEqual(serialized_value['BS'], [b'\x01', b'\x02'])
def test_serialize_list(self):
serialized_value = self.serializer.serialize(['foo', 1, [1]])
self.assertEqual(len(serialized_value), 1)
self.assertIn('L', serialized_value)
assert len(serialized_value) == 1
assert 'L' in serialized_value
self.assertCountEqual(
serialized_value['L'],
[{'S': 'foo'}, {'N': '1'}, {'L': [{'N': '1'}]}]
@ -145,9 +142,9 @@ class TestSerializer(unittest.TestCase):
def test_serialize_map(self):
serialized_value = self.serializer.serialize(
{'foo': 'bar', 'baz': {'biz': 1}})
self.assertEqual(
serialized_value,
{'M': {'foo': {'S': 'bar'}, 'baz': {'M': {'biz': {'N': '1'}}}}})
assert serialized_value == {'M':
{'foo': {'S': 'bar'}, 'baz': {'M': {'biz': {'N': '1'}}}}
}
class TestDeserializer(unittest.TestCase):
@ -155,61 +152,49 @@ class TestDeserializer(unittest.TestCase):
self.deserializer = TypeDeserializer()
def test_deserialize_invalid_type(self):
with self.assertRaisesRegex(TypeError, 'FOO is not supported'):
with pytest.raises(TypeError, match=r'FOO is not supported'):
self.deserializer.deserialize({'FOO': 'bar'})
def test_deserialize_empty_structure(self):
with self.assertRaisesRegex(TypeError, 'Value must be a nonempty'):
with pytest.raises(TypeError, match=r'Value must be a nonempty'):
self.assertEqual(self.deserializer.deserialize({}), {})
def test_deserialize_null(self):
self.assertEqual(self.deserializer.deserialize({"NULL": True}), None)
assert self.deserializer.deserialize({"NULL": True}) is None
def test_deserialize_boolean(self):
self.assertEqual(self.deserializer.deserialize({"BOOL": False}), False)
assert self.deserializer.deserialize({"BOOL": False}) is False
def test_deserialize_integer(self):
self.assertEqual(
self.deserializer.deserialize({'N': '1'}), Decimal('1'))
assert self.deserializer.deserialize({'N': '1'}) == Decimal('1')
def test_deserialize_decimal(self):
self.assertEqual(
self.deserializer.deserialize({'N': '1.25'}), Decimal('1.25'))
assert self.deserializer.deserialize({'N': '1.25'}) == Decimal('1.25')
def test_deserialize_string(self):
self.assertEqual(
self.deserializer.deserialize({'S': 'foo'}), 'foo')
assert self.deserializer.deserialize({'S': 'foo'}) == 'foo'
def test_deserialize_binary(self):
self.assertEqual(
self.deserializer.deserialize({'B': b'\x00'}), Binary(b'\x00'))
assert self.deserializer.deserialize({'B': b'\x00'}) == Binary(b'\x00')
def test_deserialize_number_set(self):
self.assertEqual(
self.deserializer.deserialize(
{'NS': ['1', '1.25']}), set([Decimal('1'), Decimal('1.25')]))
assert self.deserializer.deserialize(
{'NS': ['1', '1.25']}), set([Decimal('1') == Decimal('1.25')])
def test_deserialize_string_set(self):
self.assertEqual(
self.deserializer.deserialize(
{'SS': ['foo', 'bar']}), set(['foo', 'bar']))
assert self.deserializer.deserialize(
{'SS': ['foo', 'bar']}) == set(['foo', 'bar'])
def test_deserialize_binary_set(self):
self.assertEqual(
self.deserializer.deserialize(
{'BS': [b'\x00', b'\x01']}),
set([Binary(b'\x00'), Binary(b'\x01')]))
assert self.deserializer.deserialize({'BS': [b'\x00', b'\x01']}) == set(
[Binary(b'\x00'), Binary(b'\x01')])
def test_deserialize_list(self):
self.assertEqual(
self.deserializer.deserialize(
{'L': [{'N': '1'}, {'S': 'foo'}, {'L': [{'N': '1.25'}]}]}),
[Decimal('1'), 'foo', [Decimal('1.25')]])
assert self.deserializer.deserialize({'L':
[{'N': '1'}, {'S': 'foo'}, {'L': [{'N': '1.25'}]}]}
) == [Decimal('1'), 'foo', [Decimal('1.25')]]
def test_deserialize_map(self):
self.assertEqual(
self.deserializer.deserialize(
{'M': {'foo': {'S': 'mystring'},
'bar': {'M': {'baz': {'N': '1'}}}}}),
{'foo': 'mystring', 'bar': {'baz': Decimal('1')}}
)
assert self.deserializer.deserialize({'M': {'foo':
{'S': 'mystring'}, 'bar': {'M': {'baz': {'N': '1'}}}}}
) == {'foo': 'mystring', 'bar': {'baz': Decimal('1')}}

View File

@ -47,17 +47,16 @@ class TestCreateTags(unittest.TestCase):
self.client.create_tags.assert_called_with(**ref_kwargs)
# Ensure the calls to the Tag reference were correct.
self.assertEqual(
self.resource.Tag.call_args_list,
[mock.call('foo', 'key1', 'value1'),
mock.call('foo', 'key2', 'value2'),
mock.call('foo', 'key3', 'value3'),
mock.call('bar', 'key1', 'value1'),
mock.call('bar', 'key2', 'value2'),
mock.call('bar', 'key3', 'value3')])
assert self.resource.Tag.call_args_list == [
mock.call('foo', 'key1', 'value1'),
mock.call('foo', 'key2', 'value2'),
mock.call('foo', 'key3', 'value3'),
mock.call('bar', 'key1', 'value1'),
mock.call('bar', 'key2', 'value2'),
mock.call('bar', 'key3', 'value3')]
# Ensure the return values are as expected.
self.assertEqual(result_tags, self.ref_tags)
assert result_tags == self.ref_tags
class TestCreateTagsInjection(unittest.TestCase):
@ -65,8 +64,5 @@ class TestCreateTagsInjection(unittest.TestCase):
session = boto3.session.Session(region_name='us-west-2')
with mock.patch('boto3.ec2.createtags.create_tags') as mock_method:
resource = session.resource('ec2')
self.assertTrue(hasattr(resource, 'create_tags'),
'EC2 resource does not have create_tags method.')
self.assertIs(resource.create_tags, mock_method,
'custom create_tags method was not injected onto '
'EC2 service resource')
assert hasattr(resource, 'create_tags')
assert resource.create_tags is mock_method

View File

@ -10,6 +10,7 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from boto3.utils import ServiceContext
from boto3.resources.action import BatchAction, ServiceAction, WaiterAction
@ -43,8 +44,7 @@ class TestServiceActionCall(BaseTestCase):
action(resource, foo=1)
self.assertTrue(params_mock.called,
'Parameters for operation not created')
assert params_mock.called
@mock.patch('boto3.resources.action.create_request_parameters',
return_value={'bar': 'baz'})
@ -59,8 +59,7 @@ class TestServiceActionCall(BaseTestCase):
response = action(resource, foo=1)
operation.assert_called_with(foo=1, bar='baz')
self.assertEqual(response, 'response',
'Unexpected low-level response data returned')
assert response == 'response'
@mock.patch('boto3.resources.action.create_request_parameters',
return_value={})
@ -135,7 +134,7 @@ class TestServiceActionCall(BaseTestCase):
action = ServiceAction(self.action)
with self.assertRaises(TypeError):
with pytest.raises(TypeError):
action(resource, 'item1')
@ -164,8 +163,7 @@ class TestWaiterActionCall(BaseTestCase):
action(resource, foo=1)
self.assertTrue(params_mock.called,
'Parameters for operation not created')
assert params_mock.called
@mock.patch('boto3.resources.action.create_request_parameters',
return_value={'bar': 'baz'})
@ -314,5 +312,5 @@ class TestBatchActionCall(BaseTestCase):
model = self.model
action = BatchAction(model)
with self.assertRaises(TypeError):
with pytest.raises(TypeError):
action(collection, 'item1')

View File

@ -10,6 +10,8 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from botocore.hooks import HierarchicalEmitter
from botocore.model import ServiceModel
@ -76,17 +78,16 @@ class TestCollectionFactory(BaseTestCase):
service_context=service_context
)
self.assertEqual(collection_cls.__name__,
'test.Chain.FrobsCollectionManager')
self.assertIsInstance(collection, CollectionManager)
assert collection_cls.__name__ == 'test.Chain.FrobsCollectionManager'
assert isinstance(collection, CollectionManager)
# Make sure that collection manager created from the factory
# returns a ResourceCollection.
self.assertIsInstance(collection.all(), ResourceCollection)
assert isinstance(collection.all(), ResourceCollection)
# Make sure that the collection returned from the collection
# manager can be chained and return a ResourceCollection as well.
self.assertIsInstance(collection.all().all(), ResourceCollection)
assert isinstance(collection.all().all(), ResourceCollection)
@mock.patch('boto3.resources.collection.BatchAction')
def test_create_batch_actions(self, action_mock):
@ -137,7 +138,7 @@ class TestCollectionFactory(BaseTestCase):
service_context=service_context
)
self.assertTrue(hasattr(collection, 'delete'))
assert hasattr(collection, 'delete')
collection.delete()
@ -200,13 +201,13 @@ class TestResourceCollection(BaseTestCase):
def test_repr(self):
collection = self.get_collection()
self.assertIn('CollectionManager', repr(collection))
assert 'CollectionManager' in repr(collection)
def test_iteration_manager(self):
# A collection manager is not iterable. You must first call
# .all or .filter or another method to get an iterable.
collection = self.get_collection()
with self.assertRaises(TypeError):
with pytest.raises(TypeError):
list(collection)
def test_iteration_non_paginated(self):
@ -235,11 +236,11 @@ class TestResourceCollection(BaseTestCase):
}
collection = self.get_collection()
items = list(collection.all())
self.assertEqual(len(items), 4)
self.assertEqual(items[0].id, 'one')
self.assertEqual(items[1].id, 'two')
self.assertEqual(items[2].id, 'three')
self.assertEqual(items[3].id, 'four')
assert len(items) == 4
assert items[0].id == 'one'
assert items[1].id == 'two'
assert items[2].id == 'three'
assert items[3].id == 'four'
def test_limit_param_non_paginated(self):
self.collection_def = {
@ -267,11 +268,11 @@ class TestResourceCollection(BaseTestCase):
}
collection = self.get_collection()
items = list(collection.all().limit(2))
self.assertEqual(len(items), 2)
assert len(items) == 2
# Only the first two should be present
self.assertEqual(items[0].id, 'one')
self.assertEqual(items[1].id, 'two')
assert items[0].id == 'one'
assert items[1].id == 'two'
def test_limit_method_non_paginated(self):
self.collection_def = {
@ -299,11 +300,11 @@ class TestResourceCollection(BaseTestCase):
}
collection = self.get_collection()
items = list(collection.limit(2))
self.assertEqual(len(items), 2)
assert len(items) == 2
# Only the first two should be present
self.assertEqual(items[0].id, 'one')
self.assertEqual(items[1].id, 'two')
assert items[0].id == 'one'
assert items[1].id == 'two'
@mock.patch('boto3.resources.collection.ResourceHandler')
def test_filters_non_paginated(self, handler):
@ -357,9 +358,9 @@ class TestResourceCollection(BaseTestCase):
]
collection = self.get_collection()
pages = list(collection.limit(3).pages())
self.assertEqual(len(pages), 2)
self.assertEqual(len(pages[0]), 2)
self.assertEqual(len(pages[1]), 1)
assert len(pages) == 2
assert len(pages[0]) == 2
assert len(pages[1]) == 1
def test_page_iterator_page_size(self):
self.collection_def = {
@ -419,11 +420,11 @@ class TestResourceCollection(BaseTestCase):
]
collection = self.get_collection()
items = list(collection.all())
self.assertEqual(len(items), 4)
self.assertEqual(items[0].id, 'one')
self.assertEqual(items[1].id, 'two')
self.assertEqual(items[2].id, 'three')
self.assertEqual(items[3].id, 'four')
assert len(items) == 4
assert items[0].id == 'one'
assert items[1].id == 'two'
assert items[2].id == 'three'
assert items[3].id == 'four'
# Low-level pagination should have been called
self.client.get_paginator.assert_called_with('get_frobs')
@ -463,11 +464,11 @@ class TestResourceCollection(BaseTestCase):
]
collection = self.get_collection()
items = list(collection.all().limit(2))
self.assertEqual(len(items), 2)
assert len(items) == 2
# Only the first two should be present
self.assertEqual(items[0].id, 'one')
self.assertEqual(items[1].id, 'two')
assert items[0].id == 'one'
assert items[1].id == 'two'
def test_limit_method_paginated(self):
self.collection_def = {
@ -501,11 +502,11 @@ class TestResourceCollection(BaseTestCase):
]
collection = self.get_collection()
items = list(collection.all().limit(2))
self.assertEqual(len(items), 2)
assert len(items) == 2
# Only the first two should be present
self.assertEqual(items[0].id, 'one')
self.assertEqual(items[1].id, 'two')
assert items[0].id == 'one'
assert items[1].id == 'two'
@mock.patch('boto3.resources.collection.ResourceHandler')
def test_filters_paginated(self, handler):
@ -612,11 +613,11 @@ class TestResourceCollection(BaseTestCase):
items = list(collection.filter().all().all())
self.assertEqual(len(items), 4)
self.assertEqual(items[0].id, 'one')
self.assertEqual(items[1].id, 'two')
self.assertEqual(items[2].id, 'three')
self.assertEqual(items[3].id, 'four')
assert len(items) == 4
assert items[0].id == 'one'
assert items[1].id == 'two'
assert items[2].id == 'three'
assert items[3].id == 'four'
@mock.patch('boto3.resources.collection.ResourceHandler')
def test_chaining_copies_parameters(self, handler):
@ -674,4 +675,4 @@ class TestResourceCollection(BaseTestCase):
def test_chained_repr(self):
collection = self.get_collection()
self.assertIn('ResourceCollection', repr(collection.all()))
assert 'ResourceCollection' in repr(collection.all())

View File

@ -12,7 +12,7 @@
# language governing permissions and limitations under the License.
import botocore.session
from botocore import xform_name
from nose.tools import assert_false
import pytest
from boto3.session import Session
from boto3.resources.model import ResourceModel
@ -63,13 +63,7 @@ def _shape_has_pagination_param(shape):
return False
def test_all_collections_have_paginators_if_needed():
# If a collection relies on an operation that is paginated, it
# will require a paginator to iterate through all of the resources
# with the all() method. If there is no paginator, it will only
# make it through the first page of results. So we need to make sure
# if a collection looks like it uses a paginated operation then there
# should be a paginator applied to it.
def _collection_test_args():
botocore_session = botocore.session.get_session()
session = Session(botocore_session=botocore_session)
loader = botocore_session.get_component('data_loader')
@ -91,13 +85,24 @@ def test_all_collections_have_paginators_if_needed():
# Iterate over all of the collections for each resource model
# and ensure that the collection has a paginator if it needs one.
for collection_model in resource_model.collections:
yield (
_assert_collection_has_paginator_if_needed, client,
service_name, resource_name, collection_model)
yield (client, service_name, resource_name, collection_model)
@pytest.mark.parametrize(
'collection_args',
_collection_test_args()
)
def test_all_collections_have_paginators_if_needed(collection_args):
# If a collection relies on an operation that is paginated, it
# will require a paginator to iterate through all of the resources
# with the all() method. If there is no paginator, it will only
# make it through the first page of results. So we need to make sure
# if a collection looks like it uses a paginated operation then there
# should be a paginator applied to it.
_assert_collection_has_paginator_if_needed(*collection_args)
def _assert_collection_has_paginator_if_needed(
client, service_name, resource_name, collection_model):
client, service_name, resource_name, collection_model
):
underlying_operation_name = collection_model.request.operation
# See if the operation can be paginated from the client.
can_paginate_operation = client.can_paginate(
@ -108,10 +113,11 @@ def _assert_collection_has_paginator_if_needed(
# Make sure that if the operation looks paginated then there is
# a paginator for the client to use for the collection.
if not can_paginate_operation:
assert_false(
looks_paginated,
'Collection %s on resource %s of service %s uses the operation '
'%s, but the operation has no paginator even though it looks '
'paginated.' % (
collection_model.name, resource_name, service_name,
underlying_operation_name))
error_msg = (
f'Collection {collection_model.name} on resource {resource_name} '
f'of service {service_name} uses the operation '
f'{underlying_operation_name}, but the operation has no paginator '
f'even though it looks paginated.'
)
assert not looks_paginated, error_msg

View File

@ -10,6 +10,8 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from botocore.model import DenormalizedStructureBuilder, ServiceModel
from tests import BaseTestCase, mock
@ -50,21 +52,15 @@ class BaseTestResourceFactory(BaseTestCase):
class TestResourceFactory(BaseTestResourceFactory):
def test_get_service_returns_resource_class(self):
TestResource = self.load('test')
self.assertIn(ServiceResource, TestResource.__bases__,
'Did not return a ServiceResource subclass for service')
assert ServiceResource in TestResource.__bases__
def test_get_resource_returns_resource_class(self):
QueueResource = self.load('Queue')
self.assertIn(ServiceResource, QueueResource.__bases__,
'Did not return a ServiceResource subclass for resource')
assert ServiceResource in QueueResource.__bases__
def test_factory_sets_service_name(self):
QueueResource = self.load('Queue')
self.assertEqual(QueueResource.meta.service_name, 'test',
'Service name not set')
assert QueueResource.meta.service_name == 'test'
def test_factory_sets_identifiers(self):
model = {
@ -76,10 +72,8 @@ class TestResourceFactory(BaseTestResourceFactory):
MessageResource = self.load('Message', model)
self.assertIn('queue_url', MessageResource.meta.identifiers,
'Missing queue_url identifier from model')
self.assertIn('receipt_handle', MessageResource.meta.identifiers,
'Missing receipt_handle identifier from model')
assert 'queue_url' in MessageResource.meta.identifiers
assert 'receipt_handle' in MessageResource.meta.identifiers
def test_identifiers_in_repr(self):
model = {
@ -95,13 +89,13 @@ class TestResourceFactory(BaseTestResourceFactory):
resource = self.load('Message', model, defs)('url', 'handle')
# Class name
self.assertIn('test.Message', repr(resource))
assert 'test.Message' in repr(resource)
# Identifier names and values
self.assertIn('queue_url', repr(resource))
self.assertIn("'url'", repr(resource))
self.assertIn('receipt_handle', repr(resource))
self.assertIn("'handle'", repr(resource))
assert 'queue_url' in repr(resource)
assert "'url'" in repr(resource)
assert 'receipt_handle' in repr(resource)
assert "'handle'" in repr(resource)
def test_factory_creates_dangling_resources(self):
model = {
@ -132,10 +126,8 @@ class TestResourceFactory(BaseTestResourceFactory):
TestResource = self.load('test', model, defs)
self.assertTrue(hasattr(TestResource, 'Queue'),
'Missing Queue class from model')
self.assertTrue(hasattr(TestResource, 'Message'),
'Missing Message class from model')
assert hasattr(TestResource, 'Queue')
assert hasattr(TestResource, 'Message')
def test_factory_creates_properties(self):
model = {
@ -159,10 +151,8 @@ class TestResourceFactory(BaseTestResourceFactory):
TestResource = self.load('test', model, service_model=service_model)
self.assertTrue(hasattr(TestResource, 'e_tag'),
'ETag shape member not available on resource')
self.assertTrue(hasattr(TestResource, 'last_modified'),
'LastModified shape member not available on resource')
assert hasattr(TestResource, 'e_tag')
assert hasattr(TestResource, 'last_modified')
def test_factory_renames_on_clobber_identifier(self):
model = {
@ -175,7 +165,7 @@ class TestResourceFactory(BaseTestResourceFactory):
# must be renamed.
cls = self.load('test', model)
self.assertTrue(hasattr(cls, 'meta_identifier'))
assert hasattr(cls, 'meta_identifier')
def test_factory_fails_on_clobber_action(self):
model = {
@ -194,18 +184,17 @@ class TestResourceFactory(BaseTestResourceFactory):
# This fails because the resource has an identifier
# that would be clobbered by the action name.
with self.assertRaises(ValueError) as cm:
with pytest.raises(ValueError) as cm:
self.load('test', model)
self.assertIn('test', str(cm.exception))
self.assertIn('action', str(cm.exception))
assert 'test' in str(cm.exception)
assert 'action' in str(cm.exception)
def test_can_instantiate_service_resource(self):
TestResource = self.load('test')
resource = TestResource()
self.assertIsInstance(resource, ServiceResource,
'Object is not an instance of ServiceResource')
assert isinstance(resource, ServiceResource)
def test_non_service_resource_missing_defs(self):
# Only services should get dangling defs
@ -227,8 +216,8 @@ class TestResourceFactory(BaseTestResourceFactory):
queue = self.load('Queue', model, defs)('url')
self.assertTrue(not hasattr(queue, 'Queue'))
self.assertTrue(not hasattr(queue, 'Message'))
assert not hasattr(queue, 'Queue')
assert not hasattr(queue, 'Message')
def test_subresource_requires_only_identifier(self):
defs = {
@ -266,10 +255,8 @@ class TestResourceFactory(BaseTestResourceFactory):
# queue itself.
message = queue.Message('receipt')
self.assertEqual(message.queue_url, 'url',
'Wrong queue URL set on the message resource instance')
self.assertEqual(message.receipt_handle, 'receipt',
'Wrong receipt handle set on the message resource instance')
assert message.queue_url == 'url'
assert message.receipt_handle == 'receipt'
def test_resource_meta_unique(self):
queue_cls = self.load('Queue')
@ -277,23 +264,19 @@ class TestResourceFactory(BaseTestResourceFactory):
queue1 = queue_cls()
queue2 = queue_cls()
self.assertEqual(queue1.meta, queue2.meta,
'Queue meta copies not equal after creation')
assert queue1.meta == queue2.meta
queue1.meta.data = {'id': 'foo'}
queue2.meta.data = {'id': 'bar'}
self.assertNotEqual(queue_cls.meta, queue1.meta,
'Modified queue instance data should not modify the class data')
self.assertNotEqual(queue1.meta, queue2.meta,
'Queue data should be unique to queue instance')
self.assertNotEqual(queue1.meta, 'bad-value')
assert queue_cls.meta != queue1.meta
assert queue1.meta != queue2.meta
assert queue1.meta != 'bad-value'
def test_resource_meta_repr(self):
queue_cls = self.load('Queue')
queue = queue_cls()
self.assertEqual(repr(queue.meta),
'ResourceMeta(\'test\', identifiers=[])')
assert repr(queue.meta) == 'ResourceMeta(\'test\', identifiers=[])'
@mock.patch('boto3.resources.factory.ServiceAction')
def test_resource_calls_action(self, action_cls):
@ -340,7 +323,7 @@ class TestResourceFactory(BaseTestResourceFactory):
queue.get_message_status()
# Cached data should be cleared
self.assertIsNone(queue.meta.data)
assert queue.meta.data is None
@mock.patch('boto3.resources.factory.ServiceAction')
def test_resource_action_leaves_data(self, action_cls):
@ -365,7 +348,7 @@ class TestResourceFactory(BaseTestResourceFactory):
queue.get_message_status()
# Cached data should not be cleared
self.assertEqual(queue.meta.data, {'some': 'data'})
assert queue.meta.data == {'some': 'data'}
@mock.patch('boto3.resources.factory.ServiceAction')
def test_resource_lazy_loads_properties(self, action_cls):
@ -409,19 +392,17 @@ class TestResourceFactory(BaseTestResourceFactory):
action.assert_not_called()
# Accessing a property should call load
self.assertEqual(resource.e_tag, 'tag',
'ETag property returned wrong value')
self.assertEqual(action.call_count, 1)
assert resource.e_tag == 'tag'
assert action.call_count == 1
# Both params should have been loaded into the data bag
self.assertIn('ETag', resource.meta.data)
self.assertIn('LastModified', resource.meta.data)
assert 'ETag' in resource.meta.data
assert 'LastModified' in resource.meta.data
# Accessing another property should use cached value
# instead of making a second call.
self.assertEqual(resource.last_modified, 'never',
'LastModified property returned wrong value')
self.assertEqual(action.call_count, 1)
assert resource.last_modified == 'never'
assert action.call_count == 1
@mock.patch('boto3.resources.factory.ServiceAction')
def test_resource_lazy_properties_missing_load(self, action_cls):
@ -453,7 +434,7 @@ class TestResourceFactory(BaseTestResourceFactory):
resource = self.load(
'test', model, service_model=service_model)('url')
with self.assertRaises(ResourceLoadException):
with pytest.raises(ResourceLoadException):
resource.last_modified
@mock.patch('boto3.resources.factory.ServiceAction')
@ -480,8 +461,8 @@ class TestResourceFactory(BaseTestResourceFactory):
'test', model, service_model=service_model)(shape_id)
try:
self.assertEqual(resource.id, shape_id)
self.assertEqual(resource.foo_id, shape_id)
assert resource.id == shape_id
assert resource.foo_id == shape_id
except ResourceLoadException:
self.fail("Load attempted on identifier alias.")
@ -540,13 +521,9 @@ class TestResourceFactory(BaseTestResourceFactory):
# Load the resource with no data
resource.meta.data = {}
self.assertTrue(
hasattr(resource, 'subnet'),
'Resource should have a subnet reference')
self.assertIsNone(
resource.subnet,
'Missing identifier, should return None')
self.assertIsNone(resource.vpcs)
assert hasattr(resource, 'subnet')
assert resource.subnet is None
assert resource.vpcs is None
# Load the resource with data to instantiate a reference
resource.meta.data = {
@ -557,14 +534,14 @@ class TestResourceFactory(BaseTestResourceFactory):
]
}
self.assertIsInstance(resource.subnet, ServiceResource)
self.assertEqual(resource.subnet.id, 'abc123')
assert isinstance(resource.subnet, ServiceResource)
assert resource.subnet.id == 'abc123'
vpcs = resource.vpcs
self.assertIsInstance(vpcs, list)
self.assertEqual(len(vpcs), 2)
self.assertEqual(vpcs[0].id, 'vpc1')
self.assertEqual(vpcs[1].id, 'vpc2')
assert isinstance(vpcs, list)
assert len(vpcs) == 2
assert vpcs[0].id == 'vpc1'
assert vpcs[1].id == 'vpc2'
@mock.patch('boto3.resources.model.Collection')
def test_resource_loads_collections(self, mock_model):
@ -588,10 +565,9 @@ class TestResourceFactory(BaseTestResourceFactory):
resource = self.load('test', model, defs, service_model)()
self.assertTrue(hasattr(resource, 'queues'),
'Resource should expose queues collection')
self.assertIsInstance(resource.queues, CollectionManager,
'Queues collection should be a collection manager')
# Resource must expose queues collection
assert hasattr(resource, 'queues')
assert isinstance(resource.queues, CollectionManager)
def test_resource_loads_waiters(self):
model = {
@ -612,8 +588,7 @@ class TestResourceFactory(BaseTestResourceFactory):
resource = self.load('test', model, defs, service_model)()
self.assertTrue(hasattr(resource, 'wait_until_exists'),
'Resource should expose resource waiter: wait_until_exists')
assert hasattr(resource, 'wait_until_exists')
@mock.patch('boto3.resources.factory.WaiterAction')
def test_resource_waiter_calls_waiter_method(self, waiter_action_cls):
@ -669,56 +644,53 @@ class TestResourceFactoryDanglingResource(BaseTestResourceFactory):
resource = self.load('test', self.model, self.defs)()
q = resource.Queue('test')
self.assertIsInstance(q, ServiceResource,
'Dangling resource instance not a ServiceResource')
assert isinstance(q, ServiceResource)
def test_hash_resource_equal(self):
resource = self.load('test', self.model, self.defs)()
p = resource.Queue('test')
q = resource.Queue('test')
self.assertEqual(p, q, "Should be equal resource")
self.assertEqual(hash(p), hash(q), "Hash values should be equal")
assert p == q
assert hash(p) == hash(q)
def test_hash_resource_not_equal(self):
resource = self.load('test', self.model, self.defs)()
p = resource.Queue('test1')
q = resource.Queue('test2')
self.assertNotEqual(p, q, "Should not be equal resource")
self.assertNotEqual(hash(p), hash(q), "Hash values should be different")
assert p != q
assert hash(p) != hash(q)
def test_dangling_resource_create_with_kwarg(self):
resource = self.load('test', self.model, self.defs)()
q = resource.Queue(url='test')
self.assertIsInstance(q, ServiceResource,
'Dangling resource created with kwargs is not a ServiceResource')
assert isinstance(q, ServiceResource)
def test_dangling_resource_shares_client(self):
resource = self.load('test', self.model, self.defs)()
q = resource.Queue('test')
self.assertEqual(resource.meta.client, q.meta.client,
'Client was not shared to dangling resource instance')
assert resource.meta.client == q.meta.client
def test_dangling_resource_requires_identifier(self):
resource = self.load('test', self.model, self.defs)()
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
resource.Queue()
def test_dangling_resource_raises_for_unknown_arg(self):
resource = self.load('test', self.model, self.defs)()
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
resource.Queue(url='foo', bar='baz')
def test_dangling_resource_identifier_is_immutable(self):
resource = self.load('test', self.model, self.defs)()
queue = resource.Queue('url')
# We should not be able to change the identifier's value
with self.assertRaises(AttributeError):
with pytest.raises(AttributeError):
queue.url = 'foo'
def test_dangling_resource_equality(self):
@ -727,7 +699,7 @@ class TestResourceFactoryDanglingResource(BaseTestResourceFactory):
q1 = resource.Queue('url')
q2 = resource.Queue('url')
self.assertEqual(q1, q2)
assert q1 == q2
def test_dangling_resource_inequality(self):
self.defs = {
@ -757,8 +729,8 @@ class TestResourceFactoryDanglingResource(BaseTestResourceFactory):
q2 = resource.Queue('different')
m = q1.Message('handle')
self.assertNotEqual(q1, q2)
self.assertNotEqual(q1, m)
assert q1 != q2
assert q1 != m
def test_dangling_resource_loads_data(self):
# Given a loadable resource instance that contains a reference
@ -817,8 +789,8 @@ class TestResourceFactoryDanglingResource(BaseTestResourceFactory):
# Now, get the reference and make sure it has its data
# set as expected.
interface = instance.network_interface
self.assertIsNotNone(interface.meta.data)
self.assertEqual(interface.public_ip, '127.0.0.1')
assert interface.meta.data is not None
assert interface.public_ip == '127.0.0.1'
class TestServiceResourceSubresources(BaseTestResourceFactory):
@ -863,44 +835,43 @@ class TestServiceResourceSubresources(BaseTestResourceFactory):
def test_subresource_custom_name(self):
resource = self.load('test', self.model, self.defs)()
self.assertTrue(hasattr(resource, 'QueueObject'))
assert hasattr(resource, 'QueueObject')
def test_contains_all_subresources(self):
resource = self.load('test', self.model, self.defs)()
self.assertIn('QueueObject', dir(resource))
self.assertIn('PriorityQueue', dir(resource))
self.assertIn('Message', dir(resource))
assert 'QueueObject' in dir(resource)
assert 'PriorityQueue' in dir(resource)
assert 'Message' in dir(resource)
def test_get_available_subresources(self):
resource = self.load('test', self.model, self.defs)()
self.assertTrue(hasattr(resource, 'get_available_subresources'))
assert hasattr(resource, 'get_available_subresources')
subresources = sorted(resource.get_available_subresources())
expected = sorted(['PriorityQueue', 'Message', 'QueueObject'])
self.assertEqual(subresources, expected)
assert subresources == expected
def test_subresource_missing_all_subresources(self):
resource = self.load('test', self.model, self.defs)()
message = resource.Message('url', 'handle')
self.assertNotIn('QueueObject', dir(message))
self.assertNotIn('PriorityQueue', dir(message))
self.assertNotIn('Queue', dir(message))
self.assertNotIn('Message', dir(message))
assert 'QueueObject' not in dir(message)
assert 'PriorityQueue' not in dir(message)
assert 'Queue' not in dir(message)
assert 'Message' not in dir(message)
def test_event_emitted_when_class_created(self):
self.load('test', self.model, self.defs)
self.assertTrue(self.emitter.emit.called)
assert self.emitter.emit.called
call_args = self.emitter.emit.call_args
# Verify the correct event name emitted.
self.assertEqual(call_args[0][0],
'creating-resource-class.test.ServiceResource')
assert call_args[0][0] == 'creating-resource-class.test.ServiceResource'
# Verify we send out the class attributes dict.
actual_class_attrs = sorted(call_args[1]['class_attributes'])
self.assertEqual(actual_class_attrs, [
assert actual_class_attrs == [
'Message', 'PriorityQueue', 'QueueObject',
'get_available_subresources', 'meta'])
'get_available_subresources', 'meta']
base_classes = sorted(call_args[1]['base_classes'])
self.assertEqual(base_classes, [ServiceResource])
assert base_classes == [ServiceResource]

View File

@ -21,14 +21,14 @@ class TestModels(BaseTestCase):
def test_resource_name(self):
model = ResourceModel('test', {}, {})
self.assertEqual(model.name, 'test')
assert model.name == 'test'
def test_resource_shape(self):
model = ResourceModel('test', {
'shape': 'Frob'
}, {})
self.assertEqual(model.shape, 'Frob')
assert model.shape == 'Frob'
def test_resource_identifiers(self):
model = ResourceModel('test', {
@ -38,9 +38,9 @@ class TestModels(BaseTestCase):
]
}, {})
self.assertEqual(model.identifiers[0].name, 'one')
self.assertEqual(model.identifiers[1].name, 'two')
self.assertEqual(model.identifiers[1].member_name, 'three')
assert model.identifiers[0].name == 'one'
assert model.identifiers[1].name == 'two'
assert model.identifiers[1].member_name == 'three'
def test_resource_action_raw(self):
model = ResourceModel('test', {
@ -58,18 +58,18 @@ class TestModels(BaseTestCase):
}
}, {})
self.assertIsInstance(model.actions, list)
self.assertEqual(len(model.actions), 1)
assert isinstance(model.actions, list)
assert len(model.actions) == 1
action = model.actions[0]
self.assertIsInstance(action, Action)
self.assertEqual(action.request.operation, 'GetFrobsOperation')
self.assertIsInstance(action.request.params, list)
self.assertEqual(len(action.request.params), 1)
self.assertEqual(action.request.params[0].target, 'FrobId')
self.assertEqual(action.request.params[0].source, 'identifier')
self.assertEqual(action.request.params[0].name, 'Id')
self.assertEqual(action.path, 'Container.Frobs[]')
assert isinstance(action, Action)
assert action.request.operation == 'GetFrobsOperation'
assert isinstance(action.request.params, list)
assert len(action.request.params) == 1
assert action.request.params[0].target == 'FrobId'
assert action.request.params[0].source == 'identifier'
assert action.request.params[0].name == 'Id'
assert action.path == 'Container.Frobs[]'
def test_resource_action_response_resource(self):
model = ResourceModel('test', {
@ -86,10 +86,10 @@ class TestModels(BaseTestCase):
})
action = model.actions[0]
self.assertEqual(action.resource.type, 'Frob')
self.assertEqual(action.resource.path, 'Container.Frobs[]')
self.assertIsInstance(action.resource.model, ResourceModel)
self.assertEqual(action.resource.model.name, 'Frob')
assert action.resource.type == 'Frob'
assert action.resource.path == 'Container.Frobs[]'
assert isinstance(action.resource.model, ResourceModel)
assert action.resource.model.name == 'Frob'
def test_resource_load_action(self):
model = ResourceModel('test', {
@ -101,9 +101,9 @@ class TestModels(BaseTestCase):
}
}, {})
self.assertIsInstance(model.load, Action)
self.assertEqual(model.load.request.operation, 'GetFrobInfo')
self.assertEqual(model.load.path, '$')
assert isinstance(model.load, Action)
assert model.load.request.operation == 'GetFrobInfo'
assert model.load.path == '$'
def test_resource_batch_action(self):
model = ResourceModel('test', {
@ -120,12 +120,12 @@ class TestModels(BaseTestCase):
}
}, {})
self.assertIsInstance(model.batch_actions, list)
assert isinstance(model.batch_actions, list)
action = model.batch_actions[0]
self.assertIsInstance(action, Action)
self.assertEqual(action.request.operation, 'DeleteObjects')
self.assertEqual(action.request.params[0].target, 'Bucket')
assert isinstance(action, Action)
assert action.request.operation == 'DeleteObjects'
assert action.request.params[0].target == 'Bucket'
def test_sub_resources(self):
model = ResourceModel('test', {
@ -151,16 +151,16 @@ class TestModels(BaseTestCase):
'Frob': {}
})
self.assertIsInstance(model.subresources, list)
self.assertEqual(len(model.subresources), 2)
assert isinstance(model.subresources, list)
assert len(model.subresources) == 2
action = model.subresources[0]
resource = action.resource
self.assertIn(action.name, ['RedFrob', 'GreenFrob'])
self.assertEqual(resource.identifiers[0].target, 'Id')
self.assertEqual(resource.identifiers[0].source, 'input')
self.assertEqual(resource.type, 'Frob')
assert action.name in ['RedFrob', 'GreenFrob']
assert resource.identifiers[0].target == 'Id'
assert resource.identifiers[0].source == 'input'
assert resource.type == 'Frob'
def test_resource_references(self):
model_def = {
@ -181,15 +181,15 @@ class TestModels(BaseTestCase):
}
model = ResourceModel('test', model_def, resource_defs)
self.assertIsInstance(model.references, list)
self.assertEqual(len(model.references), 1)
assert isinstance(model.references, list)
assert len(model.references) == 1
ref = model.references[0]
self.assertEqual(ref.name, 'frob')
self.assertEqual(ref.resource.type, 'Frob')
self.assertEqual(ref.resource.identifiers[0].target, 'Id')
self.assertEqual(ref.resource.identifiers[0].source, 'data')
self.assertEqual(ref.resource.identifiers[0].path, 'FrobId')
assert ref.name == 'frob'
assert ref.resource.type == 'Frob'
assert ref.resource.identifiers[0].target == 'Id'
assert ref.resource.identifiers[0].source == 'data'
assert ref.resource.identifiers[0].path == 'FrobId'
def test_resource_collections(self):
model = ResourceModel('test', {
@ -208,13 +208,13 @@ class TestModels(BaseTestCase):
'Frob': {}
})
self.assertIsInstance(model.collections, list)
self.assertEqual(len(model.collections), 1)
self.assertIsInstance(model.collections[0], Collection)
self.assertEqual(model.collections[0].request.operation, 'GetFrobList')
self.assertEqual(model.collections[0].resource.type, 'Frob')
self.assertEqual(model.collections[0].resource.model.name, 'Frob')
self.assertEqual(model.collections[0].resource.path, 'FrobList[]')
assert isinstance(model.collections, list)
assert len(model.collections) == 1
assert isinstance(model.collections[0], Collection)
assert model.collections[0].request.operation == 'GetFrobList'
assert model.collections[0].resource.type == 'Frob'
assert model.collections[0].resource.model.name == 'Frob'
assert model.collections[0].resource.path == 'FrobList[]'
def test_waiter(self):
model = ResourceModel('test', {
@ -229,13 +229,13 @@ class TestModels(BaseTestCase):
}
}, {})
self.assertIsInstance(model.waiters, list)
assert isinstance(model.waiters, list)
waiter = model.waiters[0]
self.assertIsInstance(waiter, Waiter)
self.assertEqual(waiter.name, 'wait_until_exists')
self.assertEqual(waiter.waiter_name, 'ObjectExists')
self.assertEqual(waiter.params[0].target, 'Bucket')
assert isinstance(waiter, Waiter)
assert waiter.name == 'wait_until_exists'
assert waiter.waiter_name == 'ObjectExists'
assert waiter.params[0].target == 'Bucket'
class TestRenaming(BaseTestCase):
def test_multiple(self):
@ -277,18 +277,18 @@ class TestRenaming(BaseTestCase):
model.load_rename_map(shape)
self.assertEqual(model.identifiers[0].name, 'foo')
self.assertEqual(model.actions[0].name, 'foo_action')
self.assertEqual(model.references[0].name, 'foo_reference')
self.assertEqual(model.collections[0].name, 'foo_collection')
self.assertEqual(model.waiters[0].name, 'wait_until_foo')
assert model.identifiers[0].name == 'foo'
assert model.actions[0].name == 'foo_action'
assert model.references[0].name == 'foo_reference'
assert model.collections[0].name == 'foo_collection'
assert model.waiters[0].name == 'wait_until_foo'
# If an identifier and an attribute share the same name, then
# the attribute is essentially hidden.
self.assertNotIn('foo_attribute', model.get_attributes(shape))
assert 'foo_attribute' not in model.get_attributes(shape)
# Other attributes need to be there, though
self.assertIn('bar', model.get_attributes(shape))
assert 'bar' in model.get_attributes(shape)
# The rest of the tests below ensure the correct order of precedence
# for the various categories of attributes/properties/methods on the
@ -300,7 +300,7 @@ class TestRenaming(BaseTestCase):
model.load_rename_map()
self.assertEqual(model.identifiers[0].name, 'meta_identifier')
assert model.identifiers[0].name == 'meta_identifier'
def test_load_beats_identifier(self):
model = ResourceModel('test', {
@ -314,8 +314,8 @@ class TestRenaming(BaseTestCase):
model.load_rename_map()
self.assertTrue(model.load)
self.assertEqual(model.identifiers[0].name, 'load_identifier')
assert model.load
assert model.identifiers[0].name == 'load_identifier'
def test_identifier_beats_action(self):
model = ResourceModel('test', {
@ -331,8 +331,8 @@ class TestRenaming(BaseTestCase):
model.load_rename_map()
self.assertEqual(model.identifiers[0].name, 'foo')
self.assertEqual(model.actions[0].name, 'foo_action')
assert model.identifiers[0].name == 'foo'
assert model.actions[0].name == 'foo_action'
def test_action_beats_reference(self):
model = ResourceModel('test', {
@ -358,8 +358,8 @@ class TestRenaming(BaseTestCase):
model.load_rename_map()
self.assertEqual(model.actions[0].name, 'foo')
self.assertEqual(model.references[0].name, 'foo_reference')
assert model.actions[0].name == 'foo'
assert model.references[0].name == 'foo_reference'
def test_reference_beats_collection(self):
model = ResourceModel('test', {
@ -385,8 +385,8 @@ class TestRenaming(BaseTestCase):
model.load_rename_map()
self.assertEqual(model.references[0].name, 'foo')
self.assertEqual(model.collections[0].name, 'foo_collection')
assert model.references[0].name == 'foo'
assert model.collections[0].name == 'foo_collection'
def test_collection_beats_waiter(self):
model = ResourceModel('test', {
@ -404,8 +404,8 @@ class TestRenaming(BaseTestCase):
model.load_rename_map()
self.assertEqual(model.collections[0].name, 'wait_until_foo')
self.assertEqual(model.waiters[0].name, 'wait_until_foo_waiter')
assert model.collections[0].name == 'wait_until_foo'
assert model.waiters[0].name == 'wait_until_foo_waiter'
def test_waiter_beats_attribute(self):
model = ResourceModel('test', {
@ -422,5 +422,5 @@ class TestRenaming(BaseTestCase):
model.load_rename_map(shape)
self.assertEqual(model.waiters[0].name, 'wait_until_foo')
self.assertIn('wait_until_foo_attribute', model.get_attributes(shape))
assert model.waiters[0].name == 'wait_until_foo'
assert 'wait_until_foo_attribute' in model.get_attributes(shape)

View File

@ -10,6 +10,7 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from boto3.exceptions import ResourceLoadException
from boto3.resources.base import ResourceMeta, ServiceResource
@ -36,8 +37,7 @@ class TestServiceActionParams(BaseTestCase):
params = create_request_parameters(parent, request_model)
self.assertEqual(params['WarehouseUrl'], 'w-url',
'Parameter not set from resource identifier')
assert params['WarehouseUrl'] == 'w-url'
def test_service_action_params_data_member(self):
request_model = Request({
@ -58,8 +58,7 @@ class TestServiceActionParams(BaseTestCase):
params = create_request_parameters(parent, request_model)
self.assertEqual(params['WarehouseUrl'], 'w-url',
'Parameter not set from resource property')
assert params['WarehouseUrl'] == 'w-url'
def test_service_action_params_data_member_missing(self):
request_model = Request({
@ -86,8 +85,7 @@ class TestServiceActionParams(BaseTestCase):
params = create_request_parameters(parent, request_model)
parent.load.assert_called_with()
self.assertEqual(params['WarehouseUrl'], 'w-url',
'Parameter not set from resource property')
assert params['WarehouseUrl'] == 'w-url'
def test_service_action_params_data_member_missing_no_load(self):
request_model = Request({
@ -105,7 +103,7 @@ class TestServiceActionParams(BaseTestCase):
parent = mock.Mock(spec=ServiceResource)
parent.meta = ResourceMeta('test', data=None)
with self.assertRaises(ResourceLoadException):
with pytest.raises(ResourceLoadException):
params = create_request_parameters(parent, request_model)
def test_service_action_params_constants(self):
@ -132,12 +130,9 @@ class TestServiceActionParams(BaseTestCase):
params = create_request_parameters(None, request_model)
self.assertEqual(params['Param1'], 'param1',
'Parameter not set from string constant')
self.assertEqual(params['Param2'], 123,
'Parameter not set from integer constant')
self.assertEqual(params['Param3'], True,
'Parameter not set from boolean constant')
assert params['Param1'] == 'param1'
assert params['Param2'] == 123
assert params['Param3'] is True
def test_service_action_params_input(self):
request_model = Request({
@ -148,11 +143,11 @@ class TestServiceActionParams(BaseTestCase):
})
params = create_request_parameters(None, request_model)
self.assertEqual(params, {})
assert params == {}
params['param1'] = 'myinput'
params = create_request_parameters(None, request_model, params=params)
self.assertEqual(params, {'param1': 'myinput'})
assert params == {'param1': 'myinput'}
def test_service_action_params_invalid(self):
request_model = Request({
@ -165,7 +160,7 @@ class TestServiceActionParams(BaseTestCase):
]
})
with self.assertRaises(NotImplementedError):
with pytest.raises(NotImplementedError):
create_request_parameters(None, request_model)
def test_service_action_params_list(self):
@ -182,12 +177,9 @@ class TestServiceActionParams(BaseTestCase):
params = create_request_parameters(None, request_model)
self.assertIsInstance(params['WarehouseUrls'], list,
'Parameter did not create a list')
self.assertEqual(len(params['WarehouseUrls']), 1,
'Parameter list should only have a single item')
self.assertIn('w-url', params['WarehouseUrls'],
'Parameter not in expected list')
assert isinstance(params['WarehouseUrls'], list)
assert len(params['WarehouseUrls']) == 1
assert 'w-url' in params['WarehouseUrls']
def test_service_action_params_reuse(self):
request_model = Request({
@ -216,41 +208,41 @@ class TestServiceActionParams(BaseTestCase):
params = create_request_parameters(item1, request_model)
create_request_parameters(item2, request_model, params=params)
self.assertEqual(params, {
assert params == {
'Delete': {
'Objects': [
{'Key': 'item1'},
{'Key': 'item2'}
]
}
})
}
class TestStructBuilder(BaseTestCase):
def test_simple_value(self):
params = {}
build_param_structure(params, 'foo', 'bar')
self.assertEqual(params['foo'], 'bar')
assert params['foo'] == 'bar'
def test_nested_dict(self):
params = {}
build_param_structure(params, 'foo.bar.baz', 123)
self.assertEqual(params['foo']['bar']['baz'], 123)
assert params['foo']['bar']['baz'] == 123
def test_nested_list(self):
params = {}
build_param_structure(params, 'foo.bar[0]', 'test')
self.assertEqual(params['foo']['bar'][0], 'test')
assert params['foo']['bar'][0] == 'test'
def test_strange_offset(self):
params = {}
build_param_structure(params, 'foo[2]', 'test')
self.assertEqual(params['foo'], [{}, {}, 'test'])
assert params['foo'] == [{}, {}, 'test']
def test_nested_list_dict(self):
params = {}
build_param_structure(params, 'foo.bar[0].baz', 123)
self.assertEqual(params['foo']['bar'][0]['baz'], 123)
assert params['foo']['bar'][0]['baz'] == 123
def test_modify_existing(self):
params = {
@ -259,28 +251,28 @@ class TestStructBuilder(BaseTestCase):
]
}
build_param_structure(params, 'foo[0].secret', 123)
self.assertEqual(params['foo'][0]['key'], 'abc')
self.assertEqual(params['foo'][0]['secret'], 123)
assert params['foo'][0]['key'] == 'abc'
assert params['foo'][0]['secret'] == 123
def test_append_no_index(self):
params = {}
build_param_structure(params, 'foo[]', 123)
self.assertEqual(params['foo'], [123])
assert params['foo'] == [123]
build_param_structure(params, 'foo[]', 456)
self.assertEqual(params['foo'], [123, 456])
assert params['foo'] == [123, 456]
def test_provided_index_with_wildcard(self):
params = {}
index = 0
build_param_structure(params, 'foo[*].bar', 123, index)
build_param_structure(params, 'foo[*].baz', 456, index)
self.assertEqual(params['foo'][index], {'bar': 123, 'baz': 456})
assert params['foo'][index] == {'bar': 123, 'baz': 456}
index = 1
build_param_structure(params, 'foo[*].bar', 789, index)
build_param_structure(params, 'foo[*].baz', 123, index)
self.assertEqual(params['foo'], [
assert params['foo'] == [
{'bar': 123, 'baz': 456},
{'bar': 789, 'baz': 123}
])
]

View File

@ -10,6 +10,7 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from tests import BaseTestCase, mock
from boto3.utils import ServiceContext
@ -37,8 +38,8 @@ class TestBuildIdentifiers(BaseTestCase):
values = build_identifiers(identifiers, parent, params, response)
self.assertEqual(values[0][1], 'response-path',
'Identifier loaded from responsePath scalar not set')
# Verify identifier loaded from responsePath scalar set
assert values[0][1] == 'response-path'
def test_build_identifier_from_res_path_list(self):
identifiers = [Parameter(target='Id', source='response',
@ -58,8 +59,8 @@ class TestBuildIdentifiers(BaseTestCase):
values = build_identifiers(identifiers, parent, params, response)
self.assertEqual(values[0][1], ['response-path'],
'Identifier loaded from responsePath list not set')
# Verify identifier loaded from responsePath scalar set
assert values[0][1] == ['response-path']
def test_build_identifier_from_parent_identifier(self):
identifiers = [Parameter(target='Id', source='identifier',
@ -76,8 +77,8 @@ class TestBuildIdentifiers(BaseTestCase):
values = build_identifiers(identifiers, parent, params, response)
self.assertEqual(values[0][1], 'identifier',
'Identifier loaded from parent identifier not set')
# Verify identifier loaded from responsePath scalar set
assert values[0][1] == 'identifier'
def test_build_identifier_from_parent_data_member(self):
identifiers = [Parameter(target='Id', source='data',
@ -96,8 +97,8 @@ class TestBuildIdentifiers(BaseTestCase):
values = build_identifiers(identifiers, parent, params, response)
self.assertEqual(values[0][1], 'data-member',
'Identifier loaded from parent data member not set')
# Verify identifier loaded from responsePath scalar set
assert values[0][1] == 'data-member'
def test_build_identifier_from_req_param(self):
identifiers = [Parameter(target='Id', source='requestParameter',
@ -115,8 +116,8 @@ class TestBuildIdentifiers(BaseTestCase):
values = build_identifiers(identifiers, parent, params, response)
self.assertEqual(values[0][1], 'request-param',
'Identifier loaded from request parameter not set')
# Verify identifier loaded from responsePath scalar set
assert values[0][1] == 'request-param'
def test_build_identifier_from_invalid_source_type(self):
identifiers = [Parameter(target='Id', source='invalid')]
@ -129,7 +130,7 @@ class TestBuildIdentifiers(BaseTestCase):
}
}
with self.assertRaises(NotImplementedError):
with pytest.raises(NotImplementedError):
build_identifiers(identifiers, parent, params, response)
@ -157,54 +158,43 @@ class TestBuildEmptyResponse(BaseTestCase):
response = self.get_response()
self.assertIsInstance(response, dict,
'Structure should default to empty dictionary')
self.assertFalse(response.items(),
'Dictionary should be empty')
# Structure should default to empty dictionary
assert isinstance(response, dict)
assert response == {}
def test_empty_list(self):
self.output_shape.type_name = 'list'
response = self.get_response()
self.assertIsInstance(response, list,
'List should default to empty list')
self.assertFalse(len(response),
'List should be empty')
assert isinstance(response, list)
assert len(response) == 0
def test_empty_map(self):
self.output_shape.type_name = 'map'
response = self.get_response()
self.assertIsInstance(response, dict,
'Map should default to empty dictionary')
self.assertFalse(response.items(),
'Dictionary should be empty')
assert isinstance(response, dict)
assert response == {}
def test_empty_string(self):
self.output_shape.type_name = 'string'
self.output_shape.type_name = "string"
response = self.get_response()
self.assertIsNone(response,
'String should default to None')
assert response is None
def test_empty_integer(self):
self.output_shape.type_name = 'integer'
self.output_shape.type_name = "integer"
response = self.get_response()
assert response is None
self.assertIsNone(response,
'Integer should default to None')
def test_empty_unkown_returns_none(self):
self.output_shape.type_name = 'invalid'
def test_empty_unknown_returns_none(self):
self.output_shape.type_name = "invalid"
response = self.get_response()
self.assertIsNone(response,
'Unknown types should default to None')
assert response is None
def test_path_structure(self):
self.search_path = 'Container.Frob'
@ -224,8 +214,8 @@ class TestBuildEmptyResponse(BaseTestCase):
}
response = self.get_response()
assert response is None
self.assertEqual(response, None)
def test_path_list(self):
self.search_path = 'Container[1].Frob'
@ -243,8 +233,8 @@ class TestBuildEmptyResponse(BaseTestCase):
}
response = self.get_response()
assert response is None
self.assertEqual(response, None)
def test_path_invalid(self):
self.search_path = 'Container.Invalid'
@ -257,7 +247,7 @@ class TestBuildEmptyResponse(BaseTestCase):
'Container': container
}
with self.assertRaises(NotImplementedError):
with pytest.raises(NotImplementedError):
self.get_response()
@ -272,8 +262,8 @@ class TestRawHandler(BaseTestCase):
handler = RawHandler(search_path=None)
parsed_response = handler(parent, params, response)
self.assertEqual(parsed_response, response,
'Raw response not passed through unmodified')
# verify response is unmodified
assert parsed_response == response
def test_raw_handler_response_path(self):
parent = mock.Mock()
@ -290,8 +280,7 @@ class TestRawHandler(BaseTestCase):
handler = RawHandler(search_path='Container.Frob')
parsed_response = handler(parent, params, response)
self.assertEqual(parsed_response, frob,
'Search path not processed correctly')
assert parsed_response == frob
class TestResourceHandler(BaseTestCase):
@ -367,8 +356,7 @@ class TestResourceHandler(BaseTestCase):
}
resource = self.get_resource(search_path, response)
self.assertIsInstance(resource, ServiceResource,
'No resource instance returned from handler')
assert isinstance(resource, ServiceResource)
@mock.patch('boto3.resources.response.build_empty_response')
def test_missing_data_scalar_builds_empty_response(self, build_mock):
@ -380,10 +368,8 @@ class TestResourceHandler(BaseTestCase):
resources = self.get_resource(search_path, response)
self.assertTrue(build_mock.called,
'build_empty_response was never called')
self.assertEqual(resources, build_mock.return_value,
'build_empty_response return value was not returned')
assert build_mock.called
assert resources == build_mock.return_value
def test_create_resource_list(self):
self.identifier_path = 'Container.Frobs[].Id'
@ -405,12 +391,9 @@ class TestResourceHandler(BaseTestCase):
resources = self.get_resource(search_path, response)
self.assertIsInstance(resources, list,
'No list returned from handler')
self.assertEqual(len(resources), 2,
'Exactly two frobs should be returned')
self.assertIsInstance(resources[0], ServiceResource,
'List items are not resource instances')
assert isinstance(resources, list)
assert len(resources) == 2
assert isinstance(resources[0], ServiceResource)
def test_create_resource_list_no_search_path(self):
self.identifier_path = '[].Id'
@ -424,12 +407,9 @@ class TestResourceHandler(BaseTestCase):
resources = self.get_resource(search_path, response)
self.assertIsInstance(resources, list,
'No list returned from handler')
self.assertEqual(len(resources), 1,
'Exactly one frob should be returned')
self.assertIsInstance(resources[0], ServiceResource,
'List items are not resource instances')
assert isinstance(resources, list)
assert len(resources) == 1
assert isinstance(resources[0], ServiceResource)
@mock.patch('boto3.resources.response.build_empty_response')
def test_missing_data_list_builds_empty_response(self, build_mock):
@ -441,7 +421,5 @@ class TestResourceHandler(BaseTestCase):
resources = self.get_resource(search_path, response)
self.assertTrue(build_mock.called,
'build_empty_response was never called')
self.assertEqual(resources, build_mock.return_value,
'build_empty_response return value was not returned')
assert build_mock.called, 'build_empty_response was never called'
assert resources == build_mock.return_value

View File

@ -10,6 +10,8 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from botocore.exceptions import ClientError
from botocore.compat import six
@ -21,8 +23,8 @@ class TestInjectTransferMethods(unittest.TestCase):
def test_inject_upload_download_file_to_client(self):
class_attributes = {}
inject.inject_s3_transfer_methods(class_attributes=class_attributes)
self.assertIn('upload_file', class_attributes)
self.assertIn('download_file', class_attributes)
assert 'upload_file' in class_attributes
assert 'download_file' in class_attributes
def test_upload_file_proxies_to_transfer_object(self):
with mock.patch('boto3.s3.inject.S3Transfer') as transfer:
@ -64,9 +66,10 @@ class TestBucketLoad(unittest.TestCase):
}
inject.bucket_load(self.resource)
self.assertEqual(
self.resource.meta.data,
{'Name': self.resource.name, 'CreationDate': 2})
assert self.resource.meta.data == {
'Name': self.resource.name,
'CreationDate': 2
}
def test_bucket_load_doesnt_find_bucket(self):
self.resource.name = 'MyBucket'
@ -77,7 +80,7 @@ class TestBucketLoad(unittest.TestCase):
],
}
inject.bucket_load(self.resource)
self.assertEqual(self.resource.meta.data, {})
assert self.resource.meta.data == {}
def test_bucket_load_encounters_access_exception(self):
self.client.list_buckets.side_effect = ClientError(
@ -86,7 +89,7 @@ class TestBucketLoad(unittest.TestCase):
'Message': 'Access Denied'}},
'ListBuckets')
inject.bucket_load(self.resource)
self.assertEqual(self.resource.meta.data, {})
assert self.resource.meta.data == {}
def test_bucket_load_encounters_other_exception(self):
self.client.list_buckets.side_effect = ClientError(
@ -94,7 +97,7 @@ class TestBucketLoad(unittest.TestCase):
{'Code': 'ExpiredToken',
'Message': 'The provided token has expired.'}},
'ListBuckets')
with self.assertRaises(ClientError):
with pytest.raises(ClientError):
inject.bucket_load(self.resource)
class TestBucketTransferMethods(unittest.TestCase):
@ -188,10 +191,9 @@ class TestObejctSummaryLoad(unittest.TestCase):
def test_object_summary_load(self):
inject.object_summary_load(self.resource)
self.assertEqual(
self.resource.meta.data, {'Size': 5, 'ETag': 'my-etag'})
assert self.resource.meta.data == {'Size': 5, 'ETag': 'my-etag'}
def test_can_handle_missing_content_length(self):
self.head_object_response.pop('ContentLength')
inject.object_summary_load(self.resource)
self.assertEqual(self.resource.meta.data, {'ETag': 'my-etag'})
assert self.resource.meta.data == {'ETag': 'my-etag'}

View File

@ -10,6 +10,8 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from tests import mock, unittest
from s3transfer.manager import TransferManager
@ -30,10 +32,7 @@ class TestCreateTransferManager(unittest.TestCase):
osutil = OSUtils()
with mock.patch('boto3.s3.transfer.TransferManager') as manager:
create_transfer_manager(client, config, osutil)
self.assertEqual(
manager.call_args,
mock.call(client, config, osutil, None)
)
assert manager.call_args == mock.call(client, config, osutil, None)
def test_create_transfer_manager_with_no_threads(self):
client = object()
@ -42,9 +41,8 @@ class TestCreateTransferManager(unittest.TestCase):
with mock.patch(
'boto3.s3.transfer.TransferManager') as manager:
create_transfer_manager(client, config)
self.assertEqual(
manager.call_args,
mock.call(client, config, None, NonThreadedExecutor)
assert manager.call_args == mock.call(
client, config, None, NonThreadedExecutor
)
@ -53,9 +51,9 @@ class TestTransferConfig(unittest.TestCase):
ref_value):
# Ensure that the name set in the underlying TransferConfig (i.e.
# the actual) is the correct value.
self.assertEqual(getattr(config, actual), ref_value)
assert getattr(config, actual) == ref_value
# Ensure that backcompat name (i.e. the alias) is the correct value.
self.assertEqual(getattr(config, alias), ref_value)
assert getattr(config, alias) == ref_value
def test_alias_max_concurreny(self):
ref_value = 10
@ -104,10 +102,10 @@ class TestS3Transfer(unittest.TestCase):
def assert_callback_wrapped_in_subscriber(self, call_args):
subscribers = call_args[0][4]
# Make sure only one subscriber was passed in.
self.assertEqual(len(subscribers), 1)
assert len(subscribers) == 1
subscriber = subscribers[0]
# Make sure that the subscriber is of the correct type
self.assertIsInstance(subscriber, ProgressCallbackInvoker)
assert isinstance(subscriber, ProgressCallbackInvoker)
# Make sure that the on_progress method() calls out to the wrapped
# callback by actually invoking it.
subscriber.on_progress(bytes_transferred=1)
@ -146,49 +144,49 @@ class TestS3Transfer(unittest.TestCase):
future = mock.Mock()
future.result.side_effect = S3TransferRetriesExceededError(Exception())
self.manager.download.return_value = future
with self.assertRaises(RetriesExceededError):
with pytest.raises(RetriesExceededError):
self.transfer.download_file('bucket', 'key', '/tmp/smallfile')
def test_propogation_s3_upload_failed_error(self):
future = mock.Mock()
future.result.side_effect = ClientError({'Error': {}}, 'op_name')
self.manager.upload.return_value = future
with self.assertRaises(S3UploadFailedError):
with pytest.raises(S3UploadFailedError):
self.transfer.upload_file('smallfile', 'bucket', 'key')
def test_can_create_with_just_client(self):
transfer = S3Transfer(client=mock.Mock())
self.assertIsInstance(transfer, S3Transfer)
assert isinstance(transfer, S3Transfer)
def test_can_create_with_extra_configurations(self):
transfer = S3Transfer(
client=mock.Mock(), config=TransferConfig(), osutil=OSUtils())
self.assertIsInstance(transfer, S3Transfer)
assert isinstance(transfer, S3Transfer)
def test_client_or_manager_is_required(self):
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
S3Transfer()
def test_client_and_manager_are_mutually_exclusive(self):
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
S3Transfer(self.client, manager=self.manager)
def test_config_and_manager_are_mutually_exclusive(self):
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
S3Transfer(config=mock.Mock(), manager=self.manager)
def test_osutil_and_manager_are_mutually_exclusive(self):
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
S3Transfer(osutil=mock.Mock(), manager=self.manager)
def test_upload_requires_string_filename(self):
transfer = S3Transfer(client=mock.Mock())
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
transfer.upload_file(filename=object(), bucket='foo', key='bar')
def test_download_requires_string_filename(self):
transfer = S3Transfer(client=mock.Mock())
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
transfer.download_file(bucket='foo', key='bar', filename=object())
def test_context_manager(self):
@ -198,18 +196,17 @@ class TestS3Transfer(unittest.TestCase):
pass
# The underlying transfer manager should have had its __exit__
# called as well.
self.assertEqual(
manager.__exit__.call_args, mock.call(None, None, None))
assert manager.__exit__.call_args == mock.call(None, None, None)
def test_context_manager_with_errors(self):
manager = mock.Mock()
manager.__exit__ = mock.Mock()
raised_exception = ValueError()
with self.assertRaises(type(raised_exception)):
with pytest.raises(type(raised_exception)):
with S3Transfer(manager=manager):
raise raised_exception
# The underlying transfer manager should have had its __exit__
# called as well and pass on the error as well.
self.assertEqual(
manager.__exit__.call_args,
mock.call(type(raised_exception), raised_exception, mock.ANY))
assert manager.__exit__.call_args == mock.call(
type(raised_exception), raised_exception, mock.ANY
)

View File

@ -30,8 +30,7 @@ class TestBoto3(unittest.TestCase):
boto3.setup_default_session()
self.assertEqual(boto3.DEFAULT_SESSION, session,
'Default session not created properly')
assert boto3.DEFAULT_SESSION == session
def test_create_default_session_with_args(self):
boto3.setup_default_session(
@ -49,10 +48,8 @@ class TestBoto3(unittest.TestCase):
boto3.client('sqs')
self.assertTrue(setup_session.called,
'setup_default_session not called')
self.assertTrue(boto3.DEFAULT_SESSION.client.called,
'Default session client method not called')
assert setup_session.called
assert boto3.DEFAULT_SESSION.client.called
@mock.patch('boto3.setup_default_session',
wraps=boto3.setup_default_session)
@ -61,10 +58,8 @@ class TestBoto3(unittest.TestCase):
boto3.client('sqs')
self.assertFalse(setup_session.called,
'setup_default_session should not have been called')
self.assertTrue(boto3.DEFAULT_SESSION.client.called,
'Default session client method not called')
assert not setup_session.called
assert boto3.DEFAULT_SESSION.client.called
def test_client_passes_through_arguments(self):
boto3.DEFAULT_SESSION = self.Session()
@ -81,10 +76,8 @@ class TestBoto3(unittest.TestCase):
boto3.resource('sqs')
self.assertTrue(setup_session.called,
'setup_default_session not called')
self.assertTrue(boto3.DEFAULT_SESSION.resource.called,
'Default session resource method not called')
assert setup_session.called
assert boto3.DEFAULT_SESSION.resource.called
@mock.patch('boto3.setup_default_session',
wraps=boto3.setup_default_session)
@ -93,10 +86,8 @@ class TestBoto3(unittest.TestCase):
boto3.resource('sqs')
self.assertFalse(setup_session.called,
'setup_default_session should not have been called')
self.assertTrue(boto3.DEFAULT_SESSION.resource.called,
'Default session resource method not called')
assert not setup_session.called
assert boto3.DEFAULT_SESSION.resource.called
def test_resource_passes_through_arguments(self):
boto3.DEFAULT_SESSION = self.Session()

View File

@ -10,6 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from botocore import loaders
from botocore.exceptions import DataNotFoundError, UnknownServiceError
@ -30,7 +31,7 @@ class TestSession(BaseTestCase):
session = Session('abc123', region_name='us-west-2')
self.assertEqual(repr(session), 'Session(region_name=\'us-west-2\')')
assert repr(session) == 'Session(region_name=\'us-west-2\')'
def test_repr_on_subclasses(self):
bc_session = self.bc_session_cls.return_value
@ -42,7 +43,7 @@ class TestSession(BaseTestCase):
session = MySession('abc123', region_name='us-west-2')
self.assertEqual(repr(session), 'MySession(region_name=\'us-west-2\')')
assert repr(session) == 'MySession(region_name=\'us-west-2\')'
def test_can_access_region_name(self):
bc_session = self.bc_session_cls.return_value
@ -51,13 +52,12 @@ class TestSession(BaseTestCase):
bc_session.set_config_variable.assert_called_with('region',
'us-west-2')
self.assertEqual(session.region_name, 'us-west-2')
assert session.region_name == 'us-west-2'
def test_arguments_not_required(self):
Session()
self.assertTrue(self.bc_session_cls.called,
'Botocore session was not created')
assert self.bc_session_cls.called
def test_credentials_can_be_set(self):
bc_session = self.bc_session_cls.return_value
@ -67,10 +67,8 @@ class TestSession(BaseTestCase):
aws_secret_access_key='secret',
aws_session_token='token')
self.assertTrue(self.bc_session_cls.called,
'Botocore session was not created')
self.assertTrue(bc_session.set_credentials.called,
'Botocore session set_credentials not called from constructor')
assert self.bc_session_cls.called
assert bc_session.set_credentials.called
bc_session.set_credentials.assert_called_with(
'key', 'secret', 'token')
@ -93,9 +91,9 @@ class TestSession(BaseTestCase):
aws_session_token=token)
credentials = session.get_credentials()
self.assertEqual(credentials.access_key, access_key)
self.assertEqual(credentials.secret_key, secret_key)
self.assertEqual(credentials.token, token)
assert credentials.access_key == access_key
assert credentials.secret_key == secret_key
assert credentials.token == token
def test_profile_can_be_set(self):
bc_session = self.bc_session_cls.return_value
@ -107,21 +105,21 @@ class TestSession(BaseTestCase):
bc_session.profile = 'foo'
# We should also be able to read the value
self.assertEqual(session.profile_name, 'foo')
assert session.profile_name == 'foo'
def test_profile_default(self):
self.bc_session_cls.return_value.profile = None
session = Session()
self.assertEqual(session.profile_name, 'default')
assert session.profile_name == 'default'
def test_available_profiles(self):
bc_session = mock.Mock()
bc_session.available_profiles.return_value = ['foo','bar']
session = Session(botocore_session=bc_session)
profiles = session.available_profiles
self.assertEqual(len(profiles.return_value), 2)
assert len(profiles.return_value) == 2
def test_custom_session(self):
bc_session = self.bc_session_cls()
@ -130,7 +128,7 @@ class TestSession(BaseTestCase):
Session(botocore_session=bc_session)
# No new session was created
self.assertFalse(self.bc_session_cls.called)
assert not self.bc_session_cls.called
def test_user_agent(self):
# Here we get the underlying Botocore session, create a Boto3
@ -142,9 +140,9 @@ class TestSession(BaseTestCase):
Session(botocore_session=bc_session)
self.assertEqual(bc_session.user_agent_name, 'Boto3')
self.assertEqual(bc_session.user_agent_version, __version__)
self.assertEqual(bc_session.user_agent_extra, 'Botocore/0.68.0')
assert bc_session.user_agent_name == 'Boto3'
assert bc_session.user_agent_version == __version__
assert bc_session.user_agent_extra == 'Botocore/0.68.0'
def test_user_agent_extra(self):
# This test is the same as above, but includes custom extra content
@ -156,7 +154,7 @@ class TestSession(BaseTestCase):
Session(botocore_session=bc_session)
self.assertEqual(bc_session.user_agent_extra, 'foo Botocore/0.68.0')
assert bc_session.user_agent_extra == 'foo Botocore/0.68.0'
def test_custom_user_agent(self):
# This test ensures that a customized user-agent is left untouched.
@ -167,9 +165,9 @@ class TestSession(BaseTestCase):
Session(botocore_session=bc_session)
self.assertEqual(bc_session.user_agent_name, 'Custom')
self.assertEqual(bc_session.user_agent_version, '1.0')
self.assertEqual(bc_session.user_agent_extra, '')
assert bc_session.user_agent_name == 'Custom'
assert bc_session.user_agent_version == '1.0'
assert bc_session.user_agent_extra == ''
def test_get_available_services(self):
bc_session = self.bc_session_cls.return_value
@ -177,8 +175,7 @@ class TestSession(BaseTestCase):
session = Session()
session.get_available_services()
self.assertTrue(bc_session.get_available_services.called,
'Botocore session get_available_services not called')
assert bc_session.get_available_services.called
def test_get_available_resources(self):
mock_bc_session = mock.Mock()
@ -188,7 +185,7 @@ class TestSession(BaseTestCase):
session = Session(botocore_session=mock_bc_session)
names = session.get_available_resources()
self.assertEqual(names, ['foo', 'bar'])
assert names == ['foo', 'bar']
def test_get_available_partitions(self):
bc_session = mock.Mock()
@ -196,7 +193,7 @@ class TestSession(BaseTestCase):
session = Session(botocore_session=bc_session)
partitions = session.get_available_partitions()
self.assertEqual(partitions, ['foo'])
assert partitions == ['foo']
def test_get_available_regions(self):
bc_session = mock.Mock()
@ -208,14 +205,13 @@ class TestSession(BaseTestCase):
service_name='myservice', partition_name='aws',
allow_non_regional=False
)
self.assertEqual(partitions, ['foo'])
assert partitions == ['foo']
def test_create_client(self):
session = Session(region_name='us-east-1')
client = session.client('sqs', region_name='us-west-2')
self.assertTrue(client,
'No low-level client was returned')
assert client, 'No low-level client was returned'
def test_create_client_with_args(self):
bc_session = self.bc_session_cls.return_value
@ -248,8 +244,8 @@ class TestSession(BaseTestCase):
verify=False, region_name=None, api_version='2014-11-02',
config=mock.ANY)
client_config = session.client.call_args[1]['config']
self.assertEqual(client_config.user_agent_extra, 'Resource')
self.assertEqual(client_config.signature_version, None)
assert client_config.user_agent_extra == 'Resource'
assert client_config.signature_version == None
def test_create_resource_with_config(self):
mock_bc_session = mock.Mock()
@ -271,8 +267,8 @@ class TestSession(BaseTestCase):
verify=None, region_name=None, api_version='2014-11-02',
config=mock.ANY)
client_config = session.client.call_args[1]['config']
self.assertEqual(client_config.user_agent_extra, 'Resource')
self.assertEqual(client_config.signature_version, 'v4')
assert client_config.user_agent_extra == 'Resource'
assert client_config.signature_version == 'v4'
def test_create_resource_with_config_override_user_agent_extra(self):
mock_bc_session = mock.Mock()
@ -294,8 +290,8 @@ class TestSession(BaseTestCase):
verify=None, region_name=None, api_version='2014-11-02',
config=mock.ANY)
client_config = session.client.call_args[1]['config']
self.assertEqual(client_config.user_agent_extra, 'foo')
self.assertEqual(client_config.signature_version, 'v4')
assert client_config.user_agent_extra == 'foo'
assert client_config.signature_version == 'v4'
def test_create_resource_latest_version(self):
mock_bc_session = mock.Mock()
@ -323,16 +319,17 @@ class TestSession(BaseTestCase):
mock_bc_session.get_available_services.return_value = ['sqs']
session = Session(botocore_session=mock_bc_session)
with self.assertRaises(ResourceNotExistsError) as e:
with pytest.raises(ResourceNotExistsError) as e:
session.resource('sqs')
err_msg = str(e.exception)
# 1. should say the resource doesn't exist.
self.assertIn('resource does not exist', err_msg)
self.assertIn('sqs', err_msg)
# 2. Should list available resources you can choose.
self.assertIn('good-resource', err_msg)
# 3. Should list client if available.
self.assertIn('client', err_msg)
err_msg = str(e.exception)
# 1. should say the resource doesn't exist.
assert 'resource does not exist' in err_msg
assert 'sqs' in err_msg
# 2. Should list available resources you can choose.
assert 'good-resource' in err_msg
# 3. Should list client if available.
assert 'client' in err_msg
def test_bad_resource_name_with_no_client_has_simple_err_msg(self):
mock_bc_session = mock.Mock()
@ -345,12 +342,13 @@ class TestSession(BaseTestCase):
mock_bc_session.get_available_services.return_value = ['good-client']
session = Session(botocore_session=mock_bc_session)
with self.assertRaises(ResourceNotExistsError) as e:
with pytest.raises(ResourceNotExistsError) as e:
session.resource('bad-client')
err_msg = str(e.exception)
# Shouldn't mention anything about clients because
# 'bad-client' it not a valid boto3.client(...)
self.assertNotIn('boto3.client', err_msg)
err_msg = str(e.exception)
# Shouldn't mention anything about clients because
# 'bad-client' it not a valid boto3.client(...)
assert 'boto3.client' not in err_msg
def test_can_reach_events(self):
mock_bc_session = self.bc_session_cls()

View File

@ -13,6 +13,8 @@
import types
from tests import mock, unittest
import pytest
from boto3 import utils
@ -28,21 +30,21 @@ class TestUtils(unittest.TestCase):
importer.return_value = FakeModule
lazy_function = utils.lazy_call(
'fakemodule.FakeModule.entry_point')
self.assertEqual(lazy_function(a=1, b=2), {'a': 1, 'b': 2})
assert lazy_function(a=1, b=2) == {'a': 1, 'b': 2}
def test_import_module(self):
module = utils.import_module('boto3.s3.transfer')
self.assertEqual(module.__name__, 'boto3.s3.transfer')
self.assertIsInstance(module, types.ModuleType)
assert module.__name__ == 'boto3.s3.transfer'
assert isinstance(module, types.ModuleType)
def test_inject_attributes_with_no_shadowing(self):
class_attributes = {}
utils.inject_attribute(class_attributes, 'foo', 'bar')
self.assertEqual(class_attributes['foo'], 'bar')
assert class_attributes['foo'] == 'bar'
def test_shadowing_existing_var_raises_exception(self):
class_attributes = {'foo': 'preexisting'}
with self.assertRaises(RuntimeError):
with pytest.raises(RuntimeError):
utils.inject_attribute(class_attributes, 'foo', 'bar')
@ -51,8 +53,8 @@ class TestLazyLoadedWaiterModel(unittest.TestCase):
session = mock.Mock()
waiter_model = utils.LazyLoadedWaiterModel(
session, 'myservice', '2014-01-01')
self.assertFalse(session.get_waiter_model.called)
assert not session.get_waiter_model.called
waiter_model.get_waiter('Foo')
self.assertTrue(session.get_waiter_model.called)
assert session.get_waiter_model.called
session.get_waiter_model.return_value.get_waiter.assert_called_with(
'Foo')

View File

@ -1,5 +1,5 @@
[tox]
envlist = py36,py37,py38
envlist = py36,py37,py38,py39,py310
# Comment to build sdist and install into virtualenv
# This is helpful to test installation but takes extra time