1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
from __future__ import annotations
import logging
import warnings
from pathlib import Path
from typing import TYPE_CHECKING
from warnings import warn
from scrapy.exceptions import ScrapyDeprecationWarning
from scrapy.utils.job import job_dir
from scrapy.utils.request import (
RequestFingerprinter,
RequestFingerprinterProtocol,
referer_str,
)
if TYPE_CHECKING:
from twisted.internet.defer import Deferred
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
from scrapy.http.request import Request
from scrapy.settings import BaseSettings
from scrapy.spiders import Spider
class BaseDupeFilter:
"""Dummy duplicate request filtering class (:setting:`DUPEFILTER_CLASS`)
that does not filter out any request."""
@classmethod
def from_settings(cls, settings: BaseSettings) -> Self:
warnings.warn(
f"{cls.__name__}.from_settings() is deprecated, use from_crawler() instead.",
category=ScrapyDeprecationWarning,
stacklevel=2,
)
return cls()
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
return cls()
def request_seen(self, request: Request) -> bool:
return False
def open(self) -> Deferred[None] | None:
pass
def close(self, reason: str) -> Deferred[None] | None:
pass
def log(self, request: Request, spider: Spider) -> None:
"""Log that a request has been filtered"""
warn(
"Calling BaseDupeFilter.log() is deprecated.",
ScrapyDeprecationWarning,
stacklevel=2,
)
class RFPDupeFilter(BaseDupeFilter):
"""Duplicate request filtering class (:setting:`DUPEFILTER_CLASS`) that
filters out requests with the canonical
(:func:`w3lib.url.canonicalize_url`) :attr:`~scrapy.http.Request.url`,
:attr:`~scrapy.http.Request.method` and :attr:`~scrapy.http.Request.body`.
"""
def __init__(
self,
path: str | None = None,
debug: bool = False,
*,
fingerprinter: RequestFingerprinterProtocol | None = None,
) -> None:
self.file = None
self.fingerprinter: RequestFingerprinterProtocol = (
fingerprinter or RequestFingerprinter()
)
self.fingerprints: set[str] = set()
self.logdupes = True
self.debug = debug
self.logger = logging.getLogger(__name__)
if path:
self.file = Path(path, "requests.seen").open("a+", encoding="utf-8")
self.file.seek(0)
self.fingerprints.update(x.rstrip() for x in self.file)
@classmethod
def from_settings(
cls,
settings: BaseSettings,
*,
fingerprinter: RequestFingerprinterProtocol | None = None,
) -> Self:
warnings.warn(
f"{cls.__name__}.from_settings() is deprecated, use from_crawler() instead.",
category=ScrapyDeprecationWarning,
stacklevel=2,
)
return cls._from_settings(settings, fingerprinter=fingerprinter)
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
assert crawler.request_fingerprinter
return cls._from_settings(
crawler.settings,
fingerprinter=crawler.request_fingerprinter,
)
@classmethod
def _from_settings(
cls,
settings: BaseSettings,
*,
fingerprinter: RequestFingerprinterProtocol | None = None,
) -> Self:
debug = settings.getbool("DUPEFILTER_DEBUG")
return cls(job_dir(settings), debug, fingerprinter=fingerprinter)
def request_seen(self, request: Request) -> bool:
fp = self.request_fingerprint(request)
if fp in self.fingerprints:
return True
self.fingerprints.add(fp)
if self.file:
self.file.write(fp + "\n")
return False
def request_fingerprint(self, request: Request) -> str:
"""Returns a string that uniquely identifies the specified request."""
return self.fingerprinter.fingerprint(request).hex()
def close(self, reason: str) -> None:
if self.file:
self.file.close()
def log(self, request: Request, spider: Spider) -> None:
if self.debug:
msg = "Filtered duplicate request: %(request)s (referer: %(referer)s)"
args = {"request": request, "referer": referer_str(request)}
self.logger.debug(msg, args, extra={"spider": spider})
elif self.logdupes:
msg = (
"Filtered duplicate request: %(request)s"
" - no more duplicates will be shown"
" (see DUPEFILTER_DEBUG to show all duplicates)"
)
self.logger.debug(msg, {"request": request}, extra={"spider": spider})
self.logdupes = False
assert spider.crawler.stats
spider.crawler.stats.inc_value("dupefilter/filtered", spider=spider)
|