File: test_spidermiddleware_base.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (132 lines) | stat: -rw-r--r-- 4,334 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import pytest

from scrapy import Request, Spider
from scrapy.http import Response
from scrapy.spidermiddlewares.base import BaseSpiderMiddleware
from scrapy.utils.test import get_crawler

if TYPE_CHECKING:
    from scrapy.crawler import Crawler


@pytest.fixture
def crawler() -> Crawler:
    return get_crawler(Spider)


def test_trivial(crawler):
    class TrivialSpiderMiddleware(BaseSpiderMiddleware):
        pass

    mw = TrivialSpiderMiddleware.from_crawler(crawler)
    assert hasattr(mw, "crawler")
    assert mw.crawler is crawler
    test_req = Request("data:,")
    spider_output = [test_req, {"foo": "bar"}]
    for processed in [
        list(
            mw.process_spider_output(Response("data:,"), spider_output, crawler.spider)
        ),
        list(mw.process_start_requests(spider_output, crawler.spider)),
    ]:
        assert processed == [test_req, {"foo": "bar"}]


def test_processed_request(crawler):
    class ProcessReqSpiderMiddleware(BaseSpiderMiddleware):
        def get_processed_request(
            self, request: Request, response: Response | None
        ) -> Request | None:
            if request.url == "data:2,":
                return None
            if request.url == "data:3,":
                return Request("data:30,")
            return request

    mw = ProcessReqSpiderMiddleware.from_crawler(crawler)
    test_req1 = Request("data:1,")
    test_req2 = Request("data:2,")
    test_req3 = Request("data:3,")
    spider_output = [test_req1, {"foo": "bar"}, test_req2, test_req3]
    for processed in [
        list(
            mw.process_spider_output(Response("data:,"), spider_output, crawler.spider)
        ),
        list(mw.process_start_requests(spider_output, crawler.spider)),
    ]:
        assert len(processed) == 3
        assert isinstance(processed[0], Request)
        assert processed[0].url == "data:1,"
        assert processed[1] == {"foo": "bar"}
        assert isinstance(processed[2], Request)
        assert processed[2].url == "data:30,"


def test_processed_item(crawler):
    class ProcessItemSpiderMiddleware(BaseSpiderMiddleware):
        def get_processed_item(self, item: Any, response: Response | None) -> Any:
            if item["foo"] == 2:
                return None
            if item["foo"] == 3:
                item["foo"] = 30
            return item

    mw = ProcessItemSpiderMiddleware.from_crawler(crawler)
    test_req = Request("data:,")
    spider_output = [{"foo": 1}, {"foo": 2}, test_req, {"foo": 3}]
    for processed in [
        list(
            mw.process_spider_output(Response("data:,"), spider_output, crawler.spider)
        ),
        list(mw.process_start_requests(spider_output, crawler.spider)),
    ]:
        assert processed == [{"foo": 1}, test_req, {"foo": 30}]


def test_processed_both(crawler):
    class ProcessBothSpiderMiddleware(BaseSpiderMiddleware):
        def get_processed_request(
            self, request: Request, response: Response | None
        ) -> Request | None:
            if request.url == "data:2,":
                return None
            if request.url == "data:3,":
                return Request("data:30,")
            return request

        def get_processed_item(self, item: Any, response: Response | None) -> Any:
            if item["foo"] == 2:
                return None
            if item["foo"] == 3:
                item["foo"] = 30
            return item

    mw = ProcessBothSpiderMiddleware.from_crawler(crawler)
    test_req1 = Request("data:1,")
    test_req2 = Request("data:2,")
    test_req3 = Request("data:3,")
    spider_output = [
        test_req1,
        {"foo": 1},
        {"foo": 2},
        test_req2,
        {"foo": 3},
        test_req3,
    ]
    for processed in [
        list(
            mw.process_spider_output(Response("data:,"), spider_output, crawler.spider)
        ),
        list(mw.process_start_requests(spider_output, crawler.spider)),
    ]:
        assert len(processed) == 4
        assert isinstance(processed[0], Request)
        assert processed[0].url == "data:1,"
        assert processed[1] == {"foo": 1}
        assert processed[2] == {"foo": 30}
        assert isinstance(processed[3], Request)
        assert processed[3].url == "data:30,"