1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
from __future__ import annotations
from urllib.parse import urljoin
import pytest
from testfixtures import LogCapture
from twisted.internet import defer
from twisted.trial.unittest import TestCase
from scrapy.core.scheduler import BaseScheduler
from scrapy.http import Request
from scrapy.spiders import Spider
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.request import fingerprint
from scrapy.utils.test import get_crawler
from tests.mockserver import MockServer
PATHS = ["/a", "/b", "/c"]
URLS = [urljoin("https://example.org", p) for p in PATHS]
class MinimalScheduler:
def __init__(self) -> None:
self.requests: dict[bytes, Request] = {}
def has_pending_requests(self) -> bool:
return bool(self.requests)
def enqueue_request(self, request: Request) -> bool:
fp = fingerprint(request)
if fp not in self.requests:
self.requests[fp] = request
return True
return False
def next_request(self) -> Request | None:
if self.has_pending_requests():
fp, request = self.requests.popitem()
return request
return None
class SimpleScheduler(MinimalScheduler):
def open(self, spider: Spider) -> defer.Deferred:
return defer.succeed("open")
def close(self, reason: str) -> defer.Deferred:
return defer.succeed("close")
def __len__(self) -> int:
return len(self.requests)
class PathsSpider(Spider):
name = "paths"
def __init__(self, mockserver, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_urls = map(mockserver.url, PATHS)
def parse(self, response):
return {"path": urlparse_cached(response).path}
class InterfaceCheckMixin:
def test_scheduler_class(self):
assert isinstance(self.scheduler, BaseScheduler)
assert issubclass(self.scheduler.__class__, BaseScheduler)
class TestBaseScheduler(InterfaceCheckMixin):
def setup_method(self):
self.scheduler = BaseScheduler()
def test_methods(self):
assert self.scheduler.open(Spider("foo")) is None
assert self.scheduler.close("finished") is None
with pytest.raises(NotImplementedError):
self.scheduler.has_pending_requests()
with pytest.raises(NotImplementedError):
self.scheduler.enqueue_request(Request("https://example.org"))
with pytest.raises(NotImplementedError):
self.scheduler.next_request()
class TestMinimalScheduler(InterfaceCheckMixin):
def setup_method(self):
self.scheduler = MinimalScheduler()
def test_open_close(self):
with pytest.raises(AttributeError):
self.scheduler.open(Spider("foo"))
with pytest.raises(AttributeError):
self.scheduler.close("finished")
def test_len(self):
with pytest.raises(AttributeError):
self.scheduler.__len__()
with pytest.raises(TypeError):
len(self.scheduler)
def test_enqueue_dequeue(self):
assert not self.scheduler.has_pending_requests()
for url in URLS:
assert self.scheduler.enqueue_request(Request(url))
assert not self.scheduler.enqueue_request(Request(url))
assert self.scheduler.has_pending_requests
dequeued = []
while self.scheduler.has_pending_requests():
request = self.scheduler.next_request()
dequeued.append(request.url)
assert set(dequeued) == set(URLS)
assert not self.scheduler.has_pending_requests()
class SimpleSchedulerTest(TestCase, InterfaceCheckMixin):
def setUp(self):
self.scheduler = SimpleScheduler()
@defer.inlineCallbacks
def test_enqueue_dequeue(self):
open_result = yield self.scheduler.open(Spider("foo"))
assert open_result == "open"
assert not self.scheduler.has_pending_requests()
for url in URLS:
assert self.scheduler.enqueue_request(Request(url))
assert not self.scheduler.enqueue_request(Request(url))
assert self.scheduler.has_pending_requests()
assert len(self.scheduler) == len(URLS)
dequeued = []
while self.scheduler.has_pending_requests():
request = self.scheduler.next_request()
dequeued.append(request.url)
assert set(dequeued) == set(URLS)
assert not self.scheduler.has_pending_requests()
assert len(self.scheduler) == 0
close_result = yield self.scheduler.close("")
assert close_result == "close"
class MinimalSchedulerCrawlTest(TestCase):
scheduler_cls = MinimalScheduler
@defer.inlineCallbacks
def test_crawl(self):
with MockServer() as mockserver:
settings = {
"SCHEDULER": self.scheduler_cls,
}
with LogCapture() as log:
crawler = get_crawler(PathsSpider, settings)
yield crawler.crawl(mockserver)
for path in PATHS:
assert f"{{'path': '{path}'}}" in str(log)
assert f"'item_scraped_count': {len(PATHS)}" in str(log)
class SimpleSchedulerCrawlTest(MinimalSchedulerCrawlTest):
scheduler_cls = SimpleScheduler
|