1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from tests import unittest
import itertools
from nose.plugins.attrib import attr
import botocore.session
from botocore.exceptions import ClientError
class TestEC2(unittest.TestCase):
def setUp(self):
self.session = botocore.session.get_session()
self.client = self.session.create_client(
'ec2', region_name='us-west-2')
def test_can_make_request(self):
# Basic smoke test to ensure we can talk to ec2.
result = self.client.describe_availability_zones()
zones = list(
sorted(a['ZoneName'] for a in result['AvailabilityZones']))
self.assertTrue(
set(['us-west-2a', 'us-west-2b', 'us-west-2c']).issubset(zones))
def test_get_console_output_handles_error(self):
# Want to ensure the underlying ClientError is propogated
# on error.
with self.assertRaises(ClientError):
self.client.get_console_output(InstanceId='i-12345')
class TestEC2Pagination(unittest.TestCase):
def setUp(self):
self.session = botocore.session.get_session()
self.client = self.session.create_client(
'ec2', region_name='us-west-2')
def test_can_paginate(self):
# Using an operation that we know will paginate.
paginator = self.client.get_paginator(
'describe_reserved_instances_offerings')
pages = paginator.paginate()
results = list(itertools.islice(pages, 0, 3))
self.assertEqual(len(results), 3)
self.assertTrue(results[0]['NextToken'] != results[1]['NextToken'])
def test_can_paginate_with_page_size(self):
# Using an operation that we know will paginate.
paginator = self.client.get_paginator(
'describe_reserved_instances_offerings')
pages = paginator.paginate(PaginationConfig={'PageSize': 1})
results = list(itertools.islice(pages, 0, 3))
self.assertEqual(len(results), 3)
for parsed in results:
reserved_inst_offer = parsed['ReservedInstancesOfferings']
# There should only be one reserved instance offering on each
# page.
self.assertEqual(len(reserved_inst_offer), 1)
def test_can_fall_back_to_old_starting_token(self):
# Using an operation that we know will paginate.
paginator = self.client.get_paginator(
'describe_reserved_instances_offerings')
pages = paginator.paginate(PaginationConfig={'NextToken': 'None___1'})
try:
results = list(itertools.islice(pages, 0, 3))
self.assertEqual(len(results), 3)
self.assertTrue(results[0]['NextToken'] != results[1]['NextToken'])
except ValueError:
self.fail("Old style paginator failed.")
@attr('slow')
class TestCopySnapshotCustomization(unittest.TestCase):
def setUp(self):
self.session = botocore.session.get_session()
# However, all the test fixture setup/cleanup can use
# the client interface.
self.client = self.session.create_client('ec2', 'us-west-2')
self.client_us_east_1 = self.session.create_client(
'ec2', 'us-east-1')
def create_volume(self, encrypted=False):
available_zones = self.client.describe_availability_zones()
first_zone = available_zones['AvailabilityZones'][0]['ZoneName']
response = self.client.create_volume(
Size=1, AvailabilityZone=first_zone, Encrypted=encrypted)
volume_id = response['VolumeId']
self.addCleanup(self.client.delete_volume, VolumeId=volume_id)
self.client.get_waiter('volume_available').wait(VolumeIds=[volume_id])
return volume_id
def create_snapshot(self, volume_id):
response = self.client.create_snapshot(VolumeId=volume_id)
snapshot_id = response['SnapshotId']
self.client.get_waiter('snapshot_completed').wait(
SnapshotIds=[snapshot_id])
self.addCleanup(self.client.delete_snapshot, SnapshotId=snapshot_id)
return snapshot_id
def cleanup_copied_snapshot(self, snapshot_id):
dest_client = self.session.create_client('ec2', 'us-east-1')
self.addCleanup(dest_client.delete_snapshot,
SnapshotId=snapshot_id)
dest_client.get_waiter('snapshot_completed').wait(
SnapshotIds=[snapshot_id])
def test_can_copy_snapshot(self):
volume_id = self.create_volume()
snapshot_id = self.create_snapshot(volume_id)
result = self.client_us_east_1.copy_snapshot(
SourceRegion='us-west-2',
SourceSnapshotId=snapshot_id)
self.assertIn('SnapshotId', result)
# Cleanup code. We can wait for the snapshot to be complete
# and then we can delete the snapshot.
self.cleanup_copied_snapshot(result['SnapshotId'])
def test_can_copy_encrypted_snapshot(self):
# Note that we're creating an encrypted volume here.
volume_id = self.create_volume(encrypted=True)
snapshot_id = self.create_snapshot(volume_id)
result = self.client_us_east_1.copy_snapshot(
SourceRegion='us-west-2',
SourceSnapshotId=snapshot_id)
self.assertIn('SnapshotId', result)
self.cleanup_copied_snapshot(result['SnapshotId'])
if __name__ == '__main__':
unittest.main()
|