File: test_transfer.py

package info (click to toggle)
python-boto3 1.26.27%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 7,880 kB
  • sloc: python: 12,629; makefile: 128
file content (249 lines) | stat: -rw-r--r-- 9,504 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the 'license' file accompanying this file. This file is
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from s3transfer.futures import NonThreadedExecutor
from s3transfer.manager import TransferManager

from boto3.exceptions import RetriesExceededError, S3UploadFailedError
from boto3.s3.transfer import (
    KB,
    MB,
    ClientError,
    OSUtils,
    ProgressCallbackInvoker,
    S3Transfer,
    S3TransferRetriesExceededError,
    TransferConfig,
    create_transfer_manager,
)
from tests import mock, unittest


class TestCreateTransferManager(unittest.TestCase):
    def test_create_transfer_manager(self):
        client = object()
        config = TransferConfig()
        osutil = OSUtils()
        with mock.patch('boto3.s3.transfer.TransferManager') as manager:
            create_transfer_manager(client, config, osutil)
            assert manager.call_args == mock.call(client, config, osutil, None)

    def test_create_transfer_manager_with_no_threads(self):
        client = object()
        config = TransferConfig()
        config.use_threads = False
        with mock.patch('boto3.s3.transfer.TransferManager') as manager:
            create_transfer_manager(client, config)
            assert manager.call_args == mock.call(
                client, config, None, NonThreadedExecutor
            )


class TestTransferConfig(unittest.TestCase):
    def assert_value_of_actual_and_alias(
        self, config, actual, alias, ref_value
    ):
        # Ensure that the name set in the underlying TransferConfig (i.e.
        # the actual) is the correct value.
        assert getattr(config, actual) == ref_value
        # Ensure that backcompat name (i.e. the alias) is the correct value.
        assert getattr(config, alias) == ref_value

    def test_alias_max_concurreny(self):
        ref_value = 10
        config = TransferConfig(max_concurrency=ref_value)
        self.assert_value_of_actual_and_alias(
            config, 'max_request_concurrency', 'max_concurrency', ref_value
        )

        # Set a new value using the alias
        new_value = 15
        config.max_concurrency = new_value
        # Make sure it sets the value for both the alias and the actual
        # value that will be used in the TransferManager
        self.assert_value_of_actual_and_alias(
            config, 'max_request_concurrency', 'max_concurrency', new_value
        )

    def test_alias_max_io_queue(self):
        ref_value = 10
        config = TransferConfig(max_io_queue=ref_value)
        self.assert_value_of_actual_and_alias(
            config, 'max_io_queue_size', 'max_io_queue', ref_value
        )

        # Set a new value using the alias
        new_value = 15
        config.max_io_queue = new_value
        # Make sure it sets the value for both the alias and the actual
        # value that will be used in the TransferManager
        self.assert_value_of_actual_and_alias(
            config, 'max_io_queue_size', 'max_io_queue', new_value
        )

    def test_transferconfig_parameters(self):
        config = TransferConfig(
            multipart_threshold=8 * MB,
            max_concurrency=10,
            multipart_chunksize=8 * MB,
            num_download_attempts=5,
            max_io_queue=100,
            io_chunksize=256 * KB,
            use_threads=True,
            max_bandwidth=1024 * KB,
        )
        assert config.multipart_threshold == 8 * MB
        assert config.multipart_chunksize == 8 * MB
        assert config.max_request_concurrency == 10
        assert config.num_download_attempts == 5
        assert config.max_io_queue_size == 100
        assert config.io_chunksize == 256 * KB
        assert config.use_threads is True
        assert config.max_bandwidth == 1024 * KB


class TestProgressCallbackInvoker(unittest.TestCase):
    def test_on_progress(self):
        callback = mock.Mock()
        subscriber = ProgressCallbackInvoker(callback)
        subscriber.on_progress(bytes_transferred=1)
        callback.assert_called_with(1)


class TestS3Transfer(unittest.TestCase):
    def setUp(self):
        self.client = mock.Mock()
        self.manager = mock.Mock(TransferManager(self.client))
        self.transfer = S3Transfer(manager=self.manager)
        self.callback = mock.Mock()

    def assert_callback_wrapped_in_subscriber(self, call_args):
        subscribers = call_args[0][4]
        # Make sure only one subscriber was passed in.
        assert len(subscribers) == 1
        subscriber = subscribers[0]
        # Make sure that the subscriber is of the correct type
        assert isinstance(subscriber, ProgressCallbackInvoker)
        # Make sure that the on_progress method() calls out to the wrapped
        # callback by actually invoking it.
        subscriber.on_progress(bytes_transferred=1)
        self.callback.assert_called_with(1)

    def test_upload_file(self):
        extra_args = {'ACL': 'public-read'}
        self.transfer.upload_file(
            'smallfile', 'bucket', 'key', extra_args=extra_args
        )
        self.manager.upload.assert_called_with(
            'smallfile', 'bucket', 'key', extra_args, None
        )

    def test_download_file(self):
        extra_args = {
            'SSECustomerKey': 'foo',
            'SSECustomerAlgorithm': 'AES256',
        }
        self.transfer.download_file(
            'bucket', 'key', '/tmp/smallfile', extra_args=extra_args
        )
        self.manager.download.assert_called_with(
            'bucket', 'key', '/tmp/smallfile', extra_args, None
        )

    def test_upload_wraps_callback(self):
        self.transfer.upload_file(
            'smallfile', 'bucket', 'key', callback=self.callback
        )
        self.assert_callback_wrapped_in_subscriber(
            self.manager.upload.call_args
        )

    def test_download_wraps_callback(self):
        self.transfer.download_file(
            'bucket', 'key', '/tmp/smallfile', callback=self.callback
        )
        self.assert_callback_wrapped_in_subscriber(
            self.manager.download.call_args
        )

    def test_propogation_of_retry_error(self):
        future = mock.Mock()
        future.result.side_effect = S3TransferRetriesExceededError(Exception())
        self.manager.download.return_value = future
        with pytest.raises(RetriesExceededError):
            self.transfer.download_file('bucket', 'key', '/tmp/smallfile')

    def test_propogation_s3_upload_failed_error(self):
        future = mock.Mock()
        future.result.side_effect = ClientError({'Error': {}}, 'op_name')
        self.manager.upload.return_value = future
        with pytest.raises(S3UploadFailedError):
            self.transfer.upload_file('smallfile', 'bucket', 'key')

    def test_can_create_with_just_client(self):
        transfer = S3Transfer(client=mock.Mock())
        assert isinstance(transfer, S3Transfer)

    def test_can_create_with_extra_configurations(self):
        transfer = S3Transfer(
            client=mock.Mock(), config=TransferConfig(), osutil=OSUtils()
        )
        assert isinstance(transfer, S3Transfer)

    def test_client_or_manager_is_required(self):
        with pytest.raises(ValueError):
            S3Transfer()

    def test_client_and_manager_are_mutually_exclusive(self):
        with pytest.raises(ValueError):
            S3Transfer(self.client, manager=self.manager)

    def test_config_and_manager_are_mutually_exclusive(self):
        with pytest.raises(ValueError):
            S3Transfer(config=mock.Mock(), manager=self.manager)

    def test_osutil_and_manager_are_mutually_exclusive(self):
        with pytest.raises(ValueError):
            S3Transfer(osutil=mock.Mock(), manager=self.manager)

    def test_upload_requires_string_filename(self):
        transfer = S3Transfer(client=mock.Mock())
        with pytest.raises(ValueError):
            transfer.upload_file(filename=object(), bucket='foo', key='bar')

    def test_download_requires_string_filename(self):
        transfer = S3Transfer(client=mock.Mock())
        with pytest.raises(ValueError):
            transfer.download_file(bucket='foo', key='bar', filename=object())

    def test_context_manager(self):
        manager = mock.Mock()
        manager.__exit__ = mock.Mock()
        with S3Transfer(manager=manager):
            pass
        # The underlying transfer manager should have had its __exit__
        # called as well.
        assert manager.__exit__.call_args == mock.call(None, None, None)

    def test_context_manager_with_errors(self):
        manager = mock.Mock()
        manager.__exit__ = mock.Mock()
        raised_exception = ValueError()
        with pytest.raises(type(raised_exception)):
            with S3Transfer(manager=manager):
                raise raised_exception
        # The underlying transfer manager should have had its __exit__
        # called as well and pass on the error as well.
        assert manager.__exit__.call_args == mock.call(
            type(raised_exception), raised_exception, mock.ANY
        )