1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import pytest
from scrapy import Request, Spider
from scrapy.http import Response
from scrapy.spidermiddlewares.base import BaseSpiderMiddleware
from scrapy.utils.test import get_crawler
if TYPE_CHECKING:
from scrapy.crawler import Crawler
@pytest.fixture
def crawler() -> Crawler:
return get_crawler(Spider)
def test_trivial(crawler):
class TrivialSpiderMiddleware(BaseSpiderMiddleware):
pass
mw = TrivialSpiderMiddleware.from_crawler(crawler)
assert hasattr(mw, "crawler")
assert mw.crawler is crawler
test_req = Request("data:,")
spider_output = [test_req, {"foo": "bar"}]
for processed in [
list(
mw.process_spider_output(Response("data:,"), spider_output, crawler.spider)
),
list(mw.process_start_requests(spider_output, crawler.spider)),
]:
assert processed == [test_req, {"foo": "bar"}]
def test_processed_request(crawler):
class ProcessReqSpiderMiddleware(BaseSpiderMiddleware):
def get_processed_request(
self, request: Request, response: Response | None
) -> Request | None:
if request.url == "data:2,":
return None
if request.url == "data:3,":
return Request("data:30,")
return request
mw = ProcessReqSpiderMiddleware.from_crawler(crawler)
test_req1 = Request("data:1,")
test_req2 = Request("data:2,")
test_req3 = Request("data:3,")
spider_output = [test_req1, {"foo": "bar"}, test_req2, test_req3]
for processed in [
list(
mw.process_spider_output(Response("data:,"), spider_output, crawler.spider)
),
list(mw.process_start_requests(spider_output, crawler.spider)),
]:
assert len(processed) == 3
assert isinstance(processed[0], Request)
assert processed[0].url == "data:1,"
assert processed[1] == {"foo": "bar"}
assert isinstance(processed[2], Request)
assert processed[2].url == "data:30,"
def test_processed_item(crawler):
class ProcessItemSpiderMiddleware(BaseSpiderMiddleware):
def get_processed_item(self, item: Any, response: Response | None) -> Any:
if item["foo"] == 2:
return None
if item["foo"] == 3:
item["foo"] = 30
return item
mw = ProcessItemSpiderMiddleware.from_crawler(crawler)
test_req = Request("data:,")
spider_output = [{"foo": 1}, {"foo": 2}, test_req, {"foo": 3}]
for processed in [
list(
mw.process_spider_output(Response("data:,"), spider_output, crawler.spider)
),
list(mw.process_start_requests(spider_output, crawler.spider)),
]:
assert processed == [{"foo": 1}, test_req, {"foo": 30}]
def test_processed_both(crawler):
class ProcessBothSpiderMiddleware(BaseSpiderMiddleware):
def get_processed_request(
self, request: Request, response: Response | None
) -> Request | None:
if request.url == "data:2,":
return None
if request.url == "data:3,":
return Request("data:30,")
return request
def get_processed_item(self, item: Any, response: Response | None) -> Any:
if item["foo"] == 2:
return None
if item["foo"] == 3:
item["foo"] = 30
return item
mw = ProcessBothSpiderMiddleware.from_crawler(crawler)
test_req1 = Request("data:1,")
test_req2 = Request("data:2,")
test_req3 = Request("data:3,")
spider_output = [
test_req1,
{"foo": 1},
{"foo": 2},
test_req2,
{"foo": 3},
test_req3,
]
for processed in [
list(
mw.process_spider_output(Response("data:,"), spider_output, crawler.spider)
),
list(mw.process_start_requests(spider_output, crawler.spider)),
]:
assert len(processed) == 4
assert isinstance(processed[0], Request)
assert processed[0].url == "data:1,"
assert processed[1] == {"foo": 1}
assert processed[2] == {"foo": 30}
assert isinstance(processed[3], Request)
assert processed[3].url == "data:30,"
|