1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import pytest
from scrapy import Request, Spider
from scrapy.http import Response
from scrapy.spidermiddlewares.base import BaseSpiderMiddleware
from scrapy.utils.test import get_crawler
if TYPE_CHECKING:
from scrapy.crawler import Crawler
@pytest.fixture
def crawler() -> Crawler:
return get_crawler(Spider)
def test_trivial(crawler: Crawler) -> None:
class TrivialSpiderMiddleware(BaseSpiderMiddleware):
pass
mw = TrivialSpiderMiddleware.from_crawler(crawler)
assert hasattr(mw, "crawler")
assert mw.crawler is crawler
test_req = Request("data:,")
spider_output = [test_req, {"foo": "bar"}]
for processed in [
list(mw.process_spider_output(Response("data:,"), spider_output)),
list(mw.process_start_requests(spider_output, None)), # type: ignore[arg-type]
]:
assert processed == [test_req, {"foo": "bar"}]
def test_processed_request(crawler: Crawler) -> None:
class ProcessReqSpiderMiddleware(BaseSpiderMiddleware):
def get_processed_request(
self, request: Request, response: Response | None
) -> Request | None:
if request.url == "data:2,":
return None
if request.url == "data:3,":
return Request("data:30,")
return request
mw = ProcessReqSpiderMiddleware.from_crawler(crawler)
test_req1 = Request("data:1,")
test_req2 = Request("data:2,")
test_req3 = Request("data:3,")
spider_output = [test_req1, {"foo": "bar"}, test_req2, test_req3]
for processed in [
list(mw.process_spider_output(Response("data:,"), spider_output)),
list(mw.process_start_requests(spider_output, None)), # type: ignore[arg-type]
]:
assert len(processed) == 3
assert isinstance(processed[0], Request)
assert processed[0].url == "data:1,"
assert processed[1] == {"foo": "bar"}
assert isinstance(processed[2], Request)
assert processed[2].url == "data:30,"
def test_processed_item(crawler: Crawler) -> None:
class ProcessItemSpiderMiddleware(BaseSpiderMiddleware):
def get_processed_item(self, item: Any, response: Response | None) -> Any:
if item["foo"] == 2:
return None
if item["foo"] == 3:
item["foo"] = 30
return item
mw = ProcessItemSpiderMiddleware.from_crawler(crawler)
test_req = Request("data:,")
spider_output = [{"foo": 1}, {"foo": 2}, test_req, {"foo": 3}]
for processed in [
list(mw.process_spider_output(Response("data:,"), spider_output)),
list(mw.process_start_requests(spider_output, None)), # type: ignore[arg-type]
]:
assert processed == [{"foo": 1}, test_req, {"foo": 30}]
def test_processed_both(crawler: Crawler) -> None:
class ProcessBothSpiderMiddleware(BaseSpiderMiddleware):
def get_processed_request(
self, request: Request, response: Response | None
) -> Request | None:
if request.url == "data:2,":
return None
if request.url == "data:3,":
return Request("data:30,")
return request
def get_processed_item(self, item: Any, response: Response | None) -> Any:
if item["foo"] == 2:
return None
if item["foo"] == 3:
item["foo"] = 30
return item
mw = ProcessBothSpiderMiddleware.from_crawler(crawler)
test_req1 = Request("data:1,")
test_req2 = Request("data:2,")
test_req3 = Request("data:3,")
spider_output = [
test_req1,
{"foo": 1},
{"foo": 2},
test_req2,
{"foo": 3},
test_req3,
]
for processed in [
list(mw.process_spider_output(Response("data:,"), spider_output)),
list(mw.process_start_requests(spider_output, None)), # type: ignore[arg-type]
]:
assert len(processed) == 4
assert isinstance(processed[0], Request)
assert processed[0].url == "data:1,"
assert processed[1] == {"foo": 1}
assert processed[2] == {"foo": 30}
assert isinstance(processed[3], Request)
assert processed[3].url == "data:30,"
|