1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
"""CloseSpider is an extension that forces spiders to be closed after certain
conditions are met.
See documentation in docs/topics/extensions.rst
"""
from __future__ import annotations
import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from scrapy import Request, Spider, signals
from scrapy.exceptions import NotConfigured
if TYPE_CHECKING:
from twisted.python.failure import Failure
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
from scrapy.http import Response
logger = logging.getLogger(__name__)
class CloseSpider:
def __init__(self, crawler: Crawler):
self.crawler: Crawler = crawler
self.close_on: dict[str, Any] = {
"timeout": crawler.settings.getfloat("CLOSESPIDER_TIMEOUT"),
"itemcount": crawler.settings.getint("CLOSESPIDER_ITEMCOUNT"),
"pagecount": crawler.settings.getint("CLOSESPIDER_PAGECOUNT"),
"errorcount": crawler.settings.getint("CLOSESPIDER_ERRORCOUNT"),
"timeout_no_item": crawler.settings.getint("CLOSESPIDER_TIMEOUT_NO_ITEM"),
"pagecount_no_item": crawler.settings.getint(
"CLOSESPIDER_PAGECOUNT_NO_ITEM"
),
}
if not any(self.close_on.values()):
raise NotConfigured
self.counter: defaultdict[str, int] = defaultdict(int)
if self.close_on.get("errorcount"):
crawler.signals.connect(self.error_count, signal=signals.spider_error)
if self.close_on.get("pagecount") or self.close_on.get("pagecount_no_item"):
crawler.signals.connect(self.page_count, signal=signals.response_received)
if self.close_on.get("timeout"):
crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
if self.close_on.get("itemcount") or self.close_on.get("pagecount_no_item"):
crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
if self.close_on.get("timeout_no_item"):
self.timeout_no_item: int = self.close_on["timeout_no_item"]
self.items_in_period: int = 0
crawler.signals.connect(
self.spider_opened_no_item, signal=signals.spider_opened
)
crawler.signals.connect(
self.item_scraped_no_item, signal=signals.item_scraped
)
crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler)
def error_count(self, failure: Failure, response: Response, spider: Spider) -> None:
self.counter["errorcount"] += 1
if self.counter["errorcount"] == self.close_on["errorcount"]:
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_errorcount")
def page_count(self, response: Response, request: Request, spider: Spider) -> None:
self.counter["pagecount"] += 1
self.counter["pagecount_since_last_item"] += 1
if self.counter["pagecount"] == self.close_on["pagecount"]:
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_pagecount")
return
if self.close_on["pagecount_no_item"] and (
self.counter["pagecount_since_last_item"]
>= self.close_on["pagecount_no_item"]
):
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_pagecount_no_item")
def spider_opened(self, spider: Spider) -> None:
from twisted.internet import reactor
assert self.crawler.engine
self.task = reactor.callLater(
self.close_on["timeout"],
self.crawler.engine.close_spider,
spider,
reason="closespider_timeout",
)
def item_scraped(self, item: Any, spider: Spider) -> None:
self.counter["itemcount"] += 1
self.counter["pagecount_since_last_item"] = 0
if self.counter["itemcount"] == self.close_on["itemcount"]:
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_itemcount")
def spider_closed(self, spider: Spider) -> None:
task = getattr(self, "task", None)
if task and task.active():
task.cancel()
task_no_item = getattr(self, "task_no_item", None)
if task_no_item and task_no_item.running:
task_no_item.stop()
def spider_opened_no_item(self, spider: Spider) -> None:
from twisted.internet import task
self.task_no_item = task.LoopingCall(self._count_items_produced, spider)
self.task_no_item.start(self.timeout_no_item, now=False)
logger.info(
f"Spider will stop when no items are produced after "
f"{self.timeout_no_item} seconds."
)
def item_scraped_no_item(self, item: Any, spider: Spider) -> None:
self.items_in_period += 1
def _count_items_produced(self, spider: Spider) -> None:
if self.items_in_period >= 1:
self.items_in_period = 0
else:
logger.info(
f"Closing spider since no items were produced in the last "
f"{self.timeout_no_item} seconds."
)
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_timeout_no_item")
|