File: test_raw_api.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (196 lines) | stat: -rw-r--r-- 6,958 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
######################################################################
#
# File: test/unit/v0/test_raw_api.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import pytest

from ..test_base import TestBase
from .deps import (
    B2Http,
    B2RawHTTPApi,
    BucketRetentionSetting,
    EncryptionAlgorithm,
    EncryptionKey,
    EncryptionMode,
    EncryptionSetting,
    RetentionMode,
    RetentionPeriod,
)
from .deps_exception import UnusableFileName, WrongEncryptionModeForBucketDefault

# Unicode characters for testing filenames.  (0x0394 is a letter Delta.)
TWO_BYTE_UNICHR = chr(0x0394)
CHAR_UNDER_32 = chr(31)
DEL_CHAR = chr(127)


class TestRawAPIFilenames(TestBase):
    """Test that the filename checker passes conforming names and rejects those that don't."""

    def setUp(self):
        self.raw_api = B2RawHTTPApi(B2Http())

    def _should_be_ok(self, filename):
        """Call with test filenames that follow the filename rules.

        :param filename: unicode (or str) that follows the rules
        """
        print(f'Filename "{filename}" should be OK')
        self.assertTrue(self.raw_api.check_b2_filename(filename) is None)

    def _should_raise(self, filename, exception_message):
        """Call with filenames that don't follow the rules (so the rule checker should raise).

        :param filename: unicode (or str) that doesn't follow the rules
        :param exception_message: regexp that matches the exception's detailed message
        """
        print(f'Filename "{filename}" should raise UnusableFileName(".*{exception_message}.*").')
        with self.assertRaisesRegex(UnusableFileName, exception_message):
            self.raw_api.check_b2_filename(filename)

    def test_b2_filename_checker(self):
        """Test a conforming and non-conforming filename for each rule.

        From the B2 docs (https://www.backblaze.com/b2/docs/files.html):
        - Names can be pretty much any UTF-8 string up to 1024 bytes long.
        - No character codes below 32 are allowed.
        - Backslashes are not allowed.
        - DEL characters (127) are not allowed.
        - File names cannot start with "/", end with "/", or contain "//".
        - Maximum of 250 bytes of UTF-8 in each segment (part between slashes) of a file name.
        """
        print('test b2 filename rules')

        # Examples from doc:
        self._should_be_ok('Kitten Videos')
        self._should_be_ok('\u81ea\u7531.txt')

        # Check length
        # 1024 bytes is ok if the segments are at most 250 chars.
        s_1024 = 4 * (250 * 'x' + '/') + 20 * 'y'
        self._should_be_ok(s_1024)
        # 1025 is too long.
        self._should_raise(s_1024 + 'x', 'too long')
        # 1024 bytes with two byte characters should also work.
        s_1024_two_byte = 4 * (125 * TWO_BYTE_UNICHR + '/') + 20 * 'y'
        self._should_be_ok(s_1024_two_byte)
        # But 1025 bytes is too long.
        self._should_raise(s_1024_two_byte + 'x', 'too long')

        # Names with unicode values < 32, and DEL aren't allowed.
        self._should_raise('hey' + CHAR_UNDER_32, 'contains code.*less than 32')
        # Unicode in the filename shouldn't break the exception message.
        self._should_raise(TWO_BYTE_UNICHR + CHAR_UNDER_32, 'contains code.*less than 32')
        self._should_raise(DEL_CHAR, 'DEL.*not allowed')

        # Names can't start or end with '/' or contain '//'
        self._should_raise('/hey', 'not start.*/')
        self._should_raise('hey/', 'not .*end.*/')
        self._should_raise('not//allowed', 'contain.*//')

        # Reject segments longer than 250 bytes
        self._should_raise('foo/' + 251 * 'x', 'segment too long')
        # So a segment of 125 two-byte chars plus one should also fail.
        self._should_raise('foo/' + 125 * TWO_BYTE_UNICHR + 'x', 'segment too long')


class BucketTestBase:
    @pytest.fixture(autouse=True)
    def init(self, mocker):
        b2_http = mocker.MagicMock()
        self.raw_api = B2RawHTTPApi(b2_http)


class TestUpdateBucket(BucketTestBase):
    """Test updating bucket."""

    @pytest.fixture(autouse=True)
    def init(self, mocker):
        b2_http = mocker.MagicMock()
        self.raw_api = B2RawHTTPApi(b2_http)

    def test_assertion_raises(self):
        with pytest.raises(AssertionError):
            self.raw_api.update_bucket('test', 'account_auth_token', 'account_id', 'bucket_id')

    @pytest.mark.parametrize(
        'bucket_type,bucket_info,default_retention',
        (
            (None, {}, None),
            (
                'allPublic',
                None,
                BucketRetentionSetting(RetentionMode.COMPLIANCE, RetentionPeriod(years=1)),
            ),
        ),
    )
    def test_assertion_not_raises(self, bucket_type, bucket_info, default_retention):
        self.raw_api.update_bucket(
            'test',
            'account_auth_token',
            'account_id',
            'bucket_id',
            bucket_type=bucket_type,
            bucket_info=bucket_info,
            default_retention=default_retention,
        )

    @pytest.mark.parametrize(
        'encryption_setting,',
        (
            EncryptionSetting(
                mode=EncryptionMode.SSE_C,
                algorithm=EncryptionAlgorithm.AES256,
                key=EncryptionKey(b'key', 'key-id'),
            ),
            EncryptionSetting(
                mode=EncryptionMode.UNKNOWN,
            ),
        ),
    )
    def test_update_bucket_wrong_encryption(self, encryption_setting):
        with pytest.raises(WrongEncryptionModeForBucketDefault):
            self.raw_api.update_bucket(
                'test',
                'account_auth_token',
                'account_id',
                'bucket_id',
                default_server_side_encryption=encryption_setting,
                bucket_type='allPublic',
            )


class TestCreateBucket(BucketTestBase):
    """Test creating bucket."""

    @pytest.mark.parametrize(
        'encryption_setting,',
        (
            EncryptionSetting(
                mode=EncryptionMode.SSE_C,
                algorithm=EncryptionAlgorithm.AES256,
                key=EncryptionKey(b'key', 'key-id'),
            ),
            EncryptionSetting(
                mode=EncryptionMode.UNKNOWN,
            ),
        ),
    )
    def test_create_bucket_wrong_encryption(self, encryption_setting):
        with pytest.raises(WrongEncryptionModeForBucketDefault):
            self.raw_api.create_bucket(
                'test',
                'account_auth_token',
                'account_id',
                'bucket_id',
                bucket_type='allPrivate',
                bucket_info={},
                default_server_side_encryption=encryption_setting,
            )