File: test_client.py

package info (click to toggle)
python-botocore 1.4.70-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 22,892 kB
  • ctags: 4,763
  • sloc: python: 28,699; xml: 15,052; makefile: 132
file content (193 lines) | stat: -rw-r--r-- 7,730 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import time
import logging
import datetime
from tests import unittest, random_chars

import botocore.session
from botocore.client import ClientError
from botocore.compat import six
from botocore.exceptions import EndpointConnectionError
from six import StringIO


class TestBucketWithVersions(unittest.TestCase):
    def setUp(self):
        self.session = botocore.session.get_session()
        self.client = self.session.create_client('s3', region_name='us-west-2')
        self.bucket_name = 'botocoretest%s' % random_chars(50)

    def extract_version_ids(self, versions):
        version_ids = []
        for marker in versions['DeleteMarkers']:
            version_ids.append(marker['VersionId'])
        for version in versions['Versions']:
            version_ids.append(version['VersionId'])
        return version_ids

    def test_create_versioned_bucket(self):
        # Verifies we can:
        # 1. Create a bucket
        # 2. Enable versioning
        # 3. Put an Object
        self.client.create_bucket(
            Bucket=self.bucket_name,
            CreateBucketConfiguration={
                'LocationConstraint': 'us-west-2'
            }
        )
        self.addCleanup(self.client.delete_bucket, Bucket=self.bucket_name)

        self.client.put_bucket_versioning(
            Bucket=self.bucket_name,
            VersioningConfiguration={"Status": "Enabled"})
        response = self.client.put_object(
            Bucket=self.bucket_name, Key='testkey', Body='bytes body')
        self.addCleanup(self.client.delete_object,
                        Bucket=self.bucket_name,
                        Key='testkey',
                        VersionId=response['VersionId'])

        response = self.client.get_object(
            Bucket=self.bucket_name, Key='testkey')
        self.assertEqual(response['Body'].read(), b'bytes body')

        response = self.client.delete_object(Bucket=self.bucket_name,
                                             Key='testkey')
        # This cleanup step removes the DeleteMarker that's created
        # from the delete_object call above.
        self.addCleanup(self.client.delete_object,
                        Bucket=self.bucket_name,
                        Key='testkey',
                        VersionId=response['VersionId'])
        # Object does not exist anymore.
        with self.assertRaises(ClientError):
            self.client.get_object(Bucket=self.bucket_name, Key='testkey')
        versions = self.client.list_object_versions(Bucket=self.bucket_name)
        version_ids = self.extract_version_ids(versions)
        self.assertEqual(len(version_ids), 2)


# This is really a combination of testing the debug logging mechanism
# as well as the response wire log, which theoretically could be
# implemented in any number of modules, which makes it hard to pick
# which integration test module this code should live in, so I picked
# the client module.
class TestResponseLog(unittest.TestCase):

    def test_debug_log_contains_headers_and_body(self):
        # This test just verifies that the response headers/body
        # are in the debug log.  It's an integration test so that
        # we can refactor the code however we want, as long as we don't
        # lose this feature.
        session = botocore.session.get_session()
        client = session.create_client('s3', region_name='us-west-2')
        debug_log = StringIO()
        session.set_stream_logger('', logging.DEBUG, debug_log)
        client.list_buckets()
        debug_log_contents = debug_log.getvalue()
        self.assertIn('Response headers', debug_log_contents)
        self.assertIn('Response body', debug_log_contents)


class TestAcceptedDateTimeFormats(unittest.TestCase):
    def setUp(self):
        self.session = botocore.session.get_session()
        self.client = self.session.create_client('emr', 'us-west-2')

    def test_accepts_datetime_object(self):
        response = self.client.list_clusters(
            CreatedAfter=datetime.datetime.now())
        self.assertIn('Clusters', response)

    def test_accepts_epoch_format(self):
        response = self.client.list_clusters(CreatedAfter=0)
        self.assertIn('Clusters', response)

    def test_accepts_iso_8601_unaware(self):
        response = self.client.list_clusters(
            CreatedAfter='2014-01-01T00:00:00')
        self.assertIn('Clusters', response)

    def test_accepts_iso_8601_utc(self):
        response = self.client.list_clusters(
            CreatedAfter='2014-01-01T00:00:00Z')
        self.assertIn('Clusters', response)

    def test_accepts_iso_8701_local(self):
        response = self.client.list_clusters(
            CreatedAfter='2014-01-01T00:00:00-08:00')
        self.assertIn('Clusters', response)


class TestCreateClients(unittest.TestCase):
    def setUp(self):
        self.session = botocore.session.get_session()

    def test_client_can_clone_with_service_events(self):
        # We should also be able to create a client object.
        client = self.session.create_client('s3', region_name='us-west-2')
        # We really just want to ensure create_client doesn't raise
        # an exception, but we'll double check that the client looks right.
        self.assertTrue(hasattr(client, 'list_buckets'))

    def test_client_raises_exception_invalid_region(self):
        with self.assertRaisesRegexp(ValueError, ('Invalid endpoint')):
            self.session.create_client(
                'cloudformation', region_name='invalid region name')


class TestClientErrorMessages(unittest.TestCase):
    def test_region_mentioned_in_invalid_region(self):
        session = botocore.session.get_session()
        client = session.create_client(
            'cloudformation', region_name='us-east-999')
        with self.assertRaisesRegexp(EndpointConnectionError,
                                     'Could not connect to the endpoint URL'):
            client.list_stacks()


class TestClientMeta(unittest.TestCase):
    def setUp(self):
        self.session = botocore.session.get_session()

    def test_region_name_on_meta(self):
        client = self.session.create_client('s3', 'us-west-2')
        self.assertEqual(client.meta.region_name, 'us-west-2')

    def test_endpoint_url_on_meta(self):
        client = self.session.create_client('s3', 'us-west-2',
                                            endpoint_url='https://foo')
        self.assertEqual(client.meta.endpoint_url, 'https://foo')


class TestClientInjection(unittest.TestCase):
    def setUp(self):
        self.session = botocore.session.get_session()

    def test_can_inject_client_methods(self):

        def extra_client_method(self, name):
            return name

        def inject_client_method(class_attributes, **kwargs):
            class_attributes['extra_client_method'] = extra_client_method

        self.session.register('creating-client-class.s3',
                              inject_client_method)

        client = self.session.create_client('s3', 'us-west-2')

        # We should now have access to the extra_client_method above.
        self.assertEqual(client.extra_client_method('foo'), 'foo')