File: test_spidermiddleware_base.py

package info (click to toggle)
python-scrapy 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,308 kB
  • sloc: python: 55,321; xml: 199; makefile: 25; sh: 7
file content (124 lines) | stat: -rw-r--r-- 4,314 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import pytest

from scrapy import Request, Spider
from scrapy.http import Response
from scrapy.spidermiddlewares.base import BaseSpiderMiddleware
from scrapy.utils.test import get_crawler

if TYPE_CHECKING:
    from scrapy.crawler import Crawler


@pytest.fixture
def crawler() -> Crawler:
    return get_crawler(Spider)


def test_trivial(crawler: Crawler) -> None:
    class TrivialSpiderMiddleware(BaseSpiderMiddleware):
        pass

    mw = TrivialSpiderMiddleware.from_crawler(crawler)
    assert hasattr(mw, "crawler")
    assert mw.crawler is crawler
    test_req = Request("data:,")
    spider_output = [test_req, {"foo": "bar"}]
    for processed in [
        list(mw.process_spider_output(Response("data:,"), spider_output)),
        list(mw.process_start_requests(spider_output, None)),  # type: ignore[arg-type]
    ]:
        assert processed == [test_req, {"foo": "bar"}]


def test_processed_request(crawler: Crawler) -> None:
    class ProcessReqSpiderMiddleware(BaseSpiderMiddleware):
        def get_processed_request(
            self, request: Request, response: Response | None
        ) -> Request | None:
            if request.url == "data:2,":
                return None
            if request.url == "data:3,":
                return Request("data:30,")
            return request

    mw = ProcessReqSpiderMiddleware.from_crawler(crawler)
    test_req1 = Request("data:1,")
    test_req2 = Request("data:2,")
    test_req3 = Request("data:3,")
    spider_output = [test_req1, {"foo": "bar"}, test_req2, test_req3]
    for processed in [
        list(mw.process_spider_output(Response("data:,"), spider_output)),
        list(mw.process_start_requests(spider_output, None)),  # type: ignore[arg-type]
    ]:
        assert len(processed) == 3
        assert isinstance(processed[0], Request)
        assert processed[0].url == "data:1,"
        assert processed[1] == {"foo": "bar"}
        assert isinstance(processed[2], Request)
        assert processed[2].url == "data:30,"


def test_processed_item(crawler: Crawler) -> None:
    class ProcessItemSpiderMiddleware(BaseSpiderMiddleware):
        def get_processed_item(self, item: Any, response: Response | None) -> Any:
            if item["foo"] == 2:
                return None
            if item["foo"] == 3:
                item["foo"] = 30
            return item

    mw = ProcessItemSpiderMiddleware.from_crawler(crawler)
    test_req = Request("data:,")
    spider_output = [{"foo": 1}, {"foo": 2}, test_req, {"foo": 3}]
    for processed in [
        list(mw.process_spider_output(Response("data:,"), spider_output)),
        list(mw.process_start_requests(spider_output, None)),  # type: ignore[arg-type]
    ]:
        assert processed == [{"foo": 1}, test_req, {"foo": 30}]


def test_processed_both(crawler: Crawler) -> None:
    class ProcessBothSpiderMiddleware(BaseSpiderMiddleware):
        def get_processed_request(
            self, request: Request, response: Response | None
        ) -> Request | None:
            if request.url == "data:2,":
                return None
            if request.url == "data:3,":
                return Request("data:30,")
            return request

        def get_processed_item(self, item: Any, response: Response | None) -> Any:
            if item["foo"] == 2:
                return None
            if item["foo"] == 3:
                item["foo"] = 30
            return item

    mw = ProcessBothSpiderMiddleware.from_crawler(crawler)
    test_req1 = Request("data:1,")
    test_req2 = Request("data:2,")
    test_req3 = Request("data:3,")
    spider_output = [
        test_req1,
        {"foo": 1},
        {"foo": 2},
        test_req2,
        {"foo": 3},
        test_req3,
    ]
    for processed in [
        list(mw.process_spider_output(Response("data:,"), spider_output)),
        list(mw.process_start_requests(spider_output, None)),  # type: ignore[arg-type]
    ]:
        assert len(processed) == 4
        assert isinstance(processed[0], Request)
        assert processed[0].url == "data:1,"
        assert processed[1] == {"foo": 1}
        assert processed[2] == {"foo": 30}
        assert isinstance(processed[3], Request)
        assert processed[3].url == "data:30,"