# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from datetime import datetime from math import ceil import pytest from botocore.paginate import TokenDecoder, TokenEncoder from botocore.stub import StubAssertionError, Stubber from tests import BaseSessionTest, random_chars class TestRDSPagination(BaseSessionTest): def setUp(self): super().setUp() self.region = 'us-west-2' self.client = self.session.create_client('rds', self.region) self.stubber = Stubber(self.client) def test_can_specify_zero_marker(self): service_response = { 'LogFileData': 'foo', 'Marker': '2', 'AdditionalDataPending': True, } expected_params = { 'DBInstanceIdentifier': 'foo', 'LogFileName': 'bar', 'NumberOfLines': 2, 'Marker': '0', } function_name = 'download_db_log_file_portion' # The stubber will assert that the function is called with the expected # parameters. self.stubber.add_response( function_name, service_response, expected_params ) self.stubber.activate() try: paginator = self.client.get_paginator(function_name) result = paginator.paginate( DBInstanceIdentifier='foo', LogFileName='bar', NumberOfLines=2, PaginationConfig={'StartingToken': '0', 'MaxItems': 3}, ).build_full_result() self.assertEqual(result['LogFileData'], 'foo') self.assertIn('NextToken', result) except StubAssertionError as e: self.fail(str(e)) class TestAutoscalingPagination(BaseSessionTest): def setUp(self): super().setUp() self.region = 'us-west-2' self.client = self.session.create_client( 'autoscaling', self.region, aws_secret_access_key='foo', aws_access_key_id='bar', aws_session_token='baz', ) self.stubber = Stubber(self.client) self.stubber.activate() def _setup_scaling_pagination( self, page_size=200, max_items=100, total_items=600 ): """ Add to the stubber to test paginating describe_scaling_activities. WARNING: This only handles cases where max_items cleanly divides page_size. """ requests_per_page = page_size / max_items if requests_per_page != ceil(requests_per_page): raise NotImplementedError( "This only handles setup where max_items is less than " "page_size and where max_items evenly divides page_size." ) requests_per_page = int(requests_per_page) num_pages = int(ceil(total_items / page_size)) previous_next_token = None for i in range(num_pages): page = self.create_describe_scaling_response(page_size=page_size) # Don't create a next_token for the final page if i + 1 == num_pages: next_token = None else: next_token = random_chars(10) expected_args = {} if previous_next_token: expected_args['StartingToken'] = previous_next_token # The same page may be accessed multiple times because we are # truncating it at max_items for _ in range(requests_per_page - 1): # The page is copied because the paginator will modify the # response object, causing issues when using the stubber. self.stubber.add_response( 'describe_scaling_activities', page.copy() ) if next_token is not None: page['NextToken'] = next_token # Copying the page here isn't necessary because it is about to # be blown away anyway. self.stubber.add_response('describe_scaling_activities', page) previous_next_token = next_token def create_describe_scaling_response(self, page_size=200): """Create a valid describe_scaling_activities response.""" page = [] date = datetime.now() for _ in range(page_size): page.append( { 'AutoScalingGroupName': 'test', 'ActivityId': random_chars(10), 'Cause': 'test', 'StartTime': date, 'StatusCode': '200', } ) return {'Activities': page} def test_repeated_build_full_results(self): # This ensures that we can cleanly paginate using build_full_results. max_items = 100 total_items = 600 self._setup_scaling_pagination( max_items=max_items, total_items=total_items, page_size=200 ) paginator = self.client.get_paginator('describe_scaling_activities') conf = {'MaxItems': max_items} pagination_tokens = [] result = paginator.paginate(PaginationConfig=conf).build_full_result() all_results = result['Activities'] while 'NextToken' in result: starting_token = result['NextToken'] # We should never get a duplicate pagination token. self.assertNotIn(starting_token, pagination_tokens) pagination_tokens.append(starting_token) conf['StartingToken'] = starting_token pages = paginator.paginate(PaginationConfig=conf) result = pages.build_full_result() all_results.extend(result['Activities']) self.assertEqual(len(all_results), total_items) class TestCloudwatchLogsPagination(BaseSessionTest): def setUp(self): super().setUp() self.region = 'us-west-2' self.client = self.session.create_client( 'logs', self.region, aws_secret_access_key='foo', aws_access_key_id='bar', aws_session_token='baz', ) self.stubber = Stubber(self.client) self.stubber.activate() def test_token_with_triple_underscores(self): response = { 'events': [ { 'logStreamName': 'foobar', 'timestamp': 1560195817, 'message': 'a thing happened', 'ingestionTime': 1560195817, 'eventId': 'foo', } ], 'searchedLogStreams': [ { 'logStreamName': 'foobar', 'searchedCompletely': False, } ], } group_name = 'foo' token = 'foo___bar' expected_args = { 'logGroupName': group_name, 'nextToken': token, } self.stubber.add_response('filter_log_events', response, expected_args) paginator = self.client.get_paginator('filter_log_events') pages = paginator.paginate( PaginationConfig={ 'MaxItems': 1, 'StartingToken': token, }, logGroupName=group_name, ) result = pages.build_full_result() self.assertEqual(len(result['events']), 1) @pytest.mark.parametrize( "token_dict", ( {'foo': 'bar'}, {'foo': b'bar'}, {'foo': {'bar': b'baz'}}, {'foo': ['bar', b'baz']}, {'foo': b'\xff'}, {'foo': {'bar': b'baz', 'bin': [b'bam']}}, ), ) def test_token_encoding(token_dict): encoded = TokenEncoder().encode(token_dict) assert isinstance(encoded, str) decoded = TokenDecoder().decode(encoded) assert decoded == token_dict