File: test_get_object.py

package info (click to toggle)
awscli 2.31.35-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 156,692 kB
  • sloc: python: 213,816; xml: 14,082; makefile: 189; sh: 178; javascript: 8
file content (100 lines) | stat: -rw-r--r-- 3,365 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python
# Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import re

import awscli.clidriver
from awscli.compat import StringIO
from awscli.testutils import BaseAWSCommandParamsTest


class TestGetObject(BaseAWSCommandParamsTest):
    prefix = 's3api get-object'

    def setUp(self):
        super(TestGetObject, self).setUp()
        self.parsed_response = {'Body': StringIO()}

    def remove_file_if_exists(self, filename):
        if os.path.isfile(filename):
            os.remove(filename)

    def test_simple(self):
        cmdline = self.prefix
        cmdline += ' --bucket mybucket'
        cmdline += ' --key mykey'
        cmdline += ' outfile'
        self.addCleanup(self.remove_file_if_exists, 'outfile')
        self.assert_params_for_cmd(
            cmdline,
            {'Bucket': 'mybucket', 'ChecksumMode': 'ENABLED', 'Key': 'mykey'},
        )

    def test_range(self):
        cmdline = self.prefix
        cmdline += ' --bucket mybucket'
        cmdline += ' --key mykey'
        cmdline += ' --range bytes=0-499'
        cmdline += ' outfile'
        self.addCleanup(self.remove_file_if_exists, 'outfile')
        self.assert_params_for_cmd(
            cmdline,
            {
                'Bucket': 'mybucket',
                'ChecksumMode': 'ENABLED',
                'Key': 'mykey',
                'Range': 'bytes=0-499',
            },
        )

    def test_response_headers(self):
        cmdline = self.prefix
        cmdline += ' --bucket mybucket'
        cmdline += ' --key mykey'
        cmdline += ' --response-cache-control No-cache'
        cmdline += ' --response-content-encoding x-gzip'
        cmdline += ' outfile'
        self.addCleanup(self.remove_file_if_exists, 'outfile')
        self.assert_params_for_cmd(
            cmdline,
            {
                'Bucket': 'mybucket',
                'ChecksumMode': 'ENABLED',
                'Key': 'mykey',
                'ResponseCacheControl': 'No-cache',
                'ResponseContentEncoding': 'x-gzip',
            },
        )

    def test_streaming_output_arg_with_error_response(self):
        # Checking that the StreamingOutputArg handles the
        # case where it's passed an error body.  Previously
        # it would propogate a KeyError so we want to ensure
        # this case is handled.
        self.parsed_response = {
            'Error': {'Code': 'AuthError', 'Message': 'SomeError'}
        }
        cmdline = self.prefix
        cmdline += ' --bucket mybucket'
        cmdline += ' --key mykey'
        cmdline += ' outfile'
        self.addCleanup(self.remove_file_if_exists, 'outfile')
        self.assert_params_for_cmd(
            cmdline,
            {'Bucket': 'mybucket', 'ChecksumMode': 'ENABLED', 'Key': 'mykey'},
        )


if __name__ == "__main__":
    unittest.main()