File: test_utils_datatypes.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (372 lines) | stat: -rw-r--r-- 10,677 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import copy
import warnings
from collections.abc import Iterator, Mapping, MutableMapping

import pytest

from scrapy.exceptions import ScrapyDeprecationWarning
from scrapy.http import Request
from scrapy.utils.datatypes import (
    CaseInsensitiveDict,
    CaselessDict,
    LocalCache,
    LocalWeakReferencedCache,
    SequenceExclude,
)
from scrapy.utils.python import garbage_collect


class CaseInsensitiveDictBase:
    def test_init_dict(self):
        seq = {"red": 1, "black": 3}
        d = self.dict_class(seq)
        assert d["red"] == 1
        assert d["black"] == 3

    def test_init_pair_sequence(self):
        seq = (("red", 1), ("black", 3))
        d = self.dict_class(seq)
        assert d["red"] == 1
        assert d["black"] == 3

    def test_init_mapping(self):
        class MyMapping(Mapping):
            def __init__(self, **kwargs):
                self._d = kwargs

            def __getitem__(self, key):
                return self._d[key]

            def __iter__(self):
                return iter(self._d)

            def __len__(self):
                return len(self._d)

        seq = MyMapping(red=1, black=3)
        d = self.dict_class(seq)
        assert d["red"] == 1
        assert d["black"] == 3

    def test_init_mutable_mapping(self):
        class MyMutableMapping(MutableMapping):
            def __init__(self, **kwargs):
                self._d = kwargs

            def __getitem__(self, key):
                return self._d[key]

            def __setitem__(self, key, value):
                self._d[key] = value

            def __delitem__(self, key):
                del self._d[key]

            def __iter__(self):
                return iter(self._d)

            def __len__(self):
                return len(self._d)

        seq = MyMutableMapping(red=1, black=3)
        d = self.dict_class(seq)
        assert d["red"] == 1
        assert d["black"] == 3

    def test_caseless(self):
        d = self.dict_class()
        d["key_Lower"] = 1
        assert d["KEy_loWer"] == 1
        assert d.get("KEy_loWer") == 1

        d["KEY_LOWER"] = 3
        assert d["key_Lower"] == 3
        assert d.get("key_Lower") == 3

    def test_delete(self):
        d = self.dict_class({"key_lower": 1})
        del d["key_LOWER"]
        with pytest.raises(KeyError):
            d["key_LOWER"]
        with pytest.raises(KeyError):
            d["key_lower"]

    @pytest.mark.filterwarnings("ignore::scrapy.exceptions.ScrapyDeprecationWarning")
    def test_getdefault(self):
        d = CaselessDict()
        assert d.get("c", 5) == 5
        d["c"] = 10
        assert d.get("c", 5) == 10

    @pytest.mark.filterwarnings("ignore::scrapy.exceptions.ScrapyDeprecationWarning")
    def test_setdefault(self):
        d = CaselessDict({"a": 1, "b": 2})

        r = d.setdefault("A", 5)
        assert r == 1
        assert d["A"] == 1

        r = d.setdefault("c", 5)
        assert r == 5
        assert d["C"] == 5

    def test_fromkeys(self):
        keys = ("a", "b")

        d = self.dict_class.fromkeys(keys)
        assert d["A"] is None
        assert d["B"] is None

        d = self.dict_class.fromkeys(keys, 1)
        assert d["A"] == 1
        assert d["B"] == 1

        instance = self.dict_class()
        d = instance.fromkeys(keys)
        assert d["A"] is None
        assert d["B"] is None

        d = instance.fromkeys(keys, 1)
        assert d["A"] == 1
        assert d["B"] == 1

    def test_contains(self):
        d = self.dict_class()
        d["a"] = 1
        assert "A" in d

    def test_pop(self):
        d = self.dict_class()
        d["a"] = 1
        assert d.pop("A") == 1
        with pytest.raises(KeyError):
            d.pop("A")

    def test_normkey(self):
        class MyDict(self.dict_class):
            def _normkey(self, key):
                return key.title()

            normkey = _normkey  # deprecated CaselessDict class

        d = MyDict()
        d["key-one"] = 2
        assert list(d.keys()) == ["Key-One"]

    def test_normvalue(self):
        class MyDict(self.dict_class):
            def _normvalue(self, value):
                if value is not None:
                    return value + 1
                return None

            normvalue = _normvalue  # deprecated CaselessDict class

        d = MyDict({"key": 1})
        assert d["key"] == 2
        assert d.get("key") == 2

        d = MyDict()
        d["key"] = 1
        assert d["key"] == 2
        assert d.get("key") == 2

        d = MyDict()
        d.setdefault("key", 1)
        assert d["key"] == 2
        assert d.get("key") == 2

        d = MyDict()
        d.update({"key": 1})
        assert d["key"] == 2
        assert d.get("key") == 2

        d = MyDict.fromkeys(("key",), 1)
        assert d["key"] == 2
        assert d.get("key") == 2

    def test_copy(self):
        h1 = self.dict_class({"header1": "value"})
        h2 = copy.copy(h1)
        assert isinstance(h2, self.dict_class)
        assert h1 == h2
        assert h1.get("header1") == h2.get("header1")
        assert h1.get("header1") == h2.get("HEADER1")
        h3 = h1.copy()
        assert isinstance(h3, self.dict_class)
        assert h1 == h3
        assert h1.get("header1") == h3.get("header1")
        assert h1.get("header1") == h3.get("HEADER1")


class TestCaseInsensitiveDict(CaseInsensitiveDictBase):
    dict_class = CaseInsensitiveDict

    def test_repr(self):
        d1 = self.dict_class({"foo": "bar"})
        assert repr(d1) == "<CaseInsensitiveDict: {'foo': 'bar'}>"
        d2 = self.dict_class({"AsDf": "QwErTy", "FoO": "bAr"})
        assert repr(d2) == "<CaseInsensitiveDict: {'AsDf': 'QwErTy', 'FoO': 'bAr'}>"

    def test_iter(self):
        d = self.dict_class({"AsDf": "QwErTy", "FoO": "bAr"})
        iterkeys = iter(d)
        assert isinstance(iterkeys, Iterator)
        assert list(iterkeys) == ["AsDf", "FoO"]


@pytest.mark.filterwarnings("ignore::scrapy.exceptions.ScrapyDeprecationWarning")
class TestCaselessDict(CaseInsensitiveDictBase):
    dict_class = CaselessDict

    def test_deprecation_message(self):
        with warnings.catch_warnings(record=True) as caught:
            warnings.filterwarnings("always", category=ScrapyDeprecationWarning)
            self.dict_class({"foo": "bar"})

            assert len(caught) == 1
            assert issubclass(caught[0].category, ScrapyDeprecationWarning)
            assert (
                str(caught[0].message)
                == "scrapy.utils.datatypes.CaselessDict is deprecated,"
                " please use scrapy.utils.datatypes.CaseInsensitiveDict instead"
            )


class TestSequenceExclude:
    def test_list(self):
        seq = [1, 2, 3]
        d = SequenceExclude(seq)
        assert 0 in d
        assert 4 in d
        assert 2 not in d

    def test_range(self):
        seq = range(10, 20)
        d = SequenceExclude(seq)
        assert 5 in d
        assert 20 in d
        assert 15 not in d

    def test_range_step(self):
        seq = range(10, 20, 3)
        d = SequenceExclude(seq)
        are_not_in = [v for v in range(10, 20, 3) if v in d]
        assert are_not_in == []

        are_not_in = [v for v in range(10, 20) if v in d]
        assert are_not_in == [11, 12, 14, 15, 17, 18]

    def test_string_seq(self):
        seq = "cde"
        d = SequenceExclude(seq)
        chars = "".join(v for v in "abcdefg" if v in d)
        assert chars == "abfg"

    def test_stringset_seq(self):
        seq = set("cde")
        d = SequenceExclude(seq)
        chars = "".join(v for v in "abcdefg" if v in d)
        assert chars == "abfg"

    def test_set(self):
        """Anything that is not in the supplied sequence will evaluate as 'in' the container."""
        seq = {-3, "test", 1.1}
        d = SequenceExclude(seq)
        assert 0 in d
        assert "foo" in d
        assert 3.14 in d
        assert set("bar") in d

        # supplied sequence is a set, so checking for list (non)inclusion fails
        with pytest.raises(TypeError):
            ["a", "b", "c"] in d  # noqa: B015

        for v in [-3, "test", 1.1]:
            assert v not in d


class TestLocalCache:
    def test_cache_with_limit(self):
        cache = LocalCache(limit=2)
        cache["a"] = 1
        cache["b"] = 2
        cache["c"] = 3
        assert len(cache) == 2
        assert "a" not in cache
        assert "b" in cache
        assert "c" in cache
        assert cache["b"] == 2
        assert cache["c"] == 3

    def test_cache_without_limit(self):
        maximum = 10**4
        cache = LocalCache()
        for x in range(maximum):
            cache[str(x)] = x
        assert len(cache) == maximum
        for x in range(maximum):
            assert str(x) in cache
            assert cache[str(x)] == x


class TestLocalWeakReferencedCache:
    def test_cache_with_limit(self):
        cache = LocalWeakReferencedCache(limit=2)
        r1 = Request("https://example.org")
        r2 = Request("https://example.com")
        r3 = Request("https://example.net")
        cache[r1] = 1
        cache[r2] = 2
        cache[r3] = 3
        assert len(cache) == 2
        assert r1 not in cache
        assert r2 in cache
        assert r3 in cache
        assert cache[r1] is None
        assert cache[r2] == 2
        assert cache[r3] == 3
        del r2

        # PyPy takes longer to collect dead references
        garbage_collect()

        assert len(cache) == 1

    def test_cache_non_weak_referenceable_objects(self):
        cache = LocalWeakReferencedCache()
        k1 = None
        k2 = 1
        k3 = [1, 2, 3]
        cache[k1] = 1
        cache[k2] = 2
        cache[k3] = 3
        assert k1 not in cache
        assert k2 not in cache
        assert k3 not in cache
        assert len(cache) == 0

    def test_cache_without_limit(self):
        max = 10**4
        cache = LocalWeakReferencedCache()
        refs = []
        for x in range(max):
            refs.append(Request(f"https://example.org/{x}"))
            cache[refs[-1]] = x
        assert len(cache) == max
        for i, r in enumerate(refs):
            assert r in cache
            assert cache[r] == i
        del r  # delete reference to the last object in the list  # pylint: disable=undefined-loop-variable

        # delete half of the objects, make sure that is reflected in the cache
        for _ in range(max // 2):
            refs.pop()

        # PyPy takes longer to collect dead references
        garbage_collect()

        assert len(cache) == max // 2
        for i, r in enumerate(refs):
            assert r in cache
            assert cache[r] == i