1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
from __future__ import annotations
import logging
import re
# Iterable is needed at the run time for the SitemapSpider._parse_sitemap() annotation
from collections.abc import AsyncIterator, Iterable, Sequence # noqa: TC003
from typing import TYPE_CHECKING, Any, cast
from scrapy.http import Request, Response, XmlResponse
from scrapy.spiders import Spider
from scrapy.utils._compression import _DecompressionMaxSizeExceeded
from scrapy.utils.gz import gunzip, gzip_magic_number
from scrapy.utils.sitemap import Sitemap, sitemap_urls_from_robots
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
from scrapy.http.request import CallbackT
logger = logging.getLogger(__name__)
class SitemapSpider(Spider):
sitemap_urls: Sequence[str] = ()
sitemap_rules: Sequence[tuple[re.Pattern[str] | str, str | CallbackT]] = [
("", "parse")
]
sitemap_follow: Sequence[re.Pattern[str] | str] = [""]
sitemap_alternate_links: bool = False
_max_size: int
_warn_size: int
@classmethod
def from_crawler(cls, crawler: Crawler, *args: Any, **kwargs: Any) -> Self:
spider = super().from_crawler(crawler, *args, **kwargs)
spider._max_size = getattr(
spider, "download_maxsize", spider.settings.getint("DOWNLOAD_MAXSIZE")
)
spider._warn_size = getattr(
spider, "download_warnsize", spider.settings.getint("DOWNLOAD_WARNSIZE")
)
return spider
def __init__(self, *a: Any, **kw: Any):
super().__init__(*a, **kw)
self._cbs: list[tuple[re.Pattern[str], CallbackT]] = []
for r, c in self.sitemap_rules:
if isinstance(c, str):
c = cast("CallbackT", getattr(self, c))
self._cbs.append((regex(r), c))
self._follow: list[re.Pattern[str]] = [regex(x) for x in self.sitemap_follow]
async def start(self) -> AsyncIterator[Any]:
for item_or_request in self.start_requests():
yield item_or_request
def start_requests(self) -> Iterable[Request]:
for url in self.sitemap_urls:
yield Request(url, self._parse_sitemap)
def sitemap_filter(
self, entries: Iterable[dict[str, Any]]
) -> Iterable[dict[str, Any]]:
"""This method can be used to filter sitemap entries by their
attributes, for example, you can filter locs with lastmod greater
than a given date (see docs).
"""
yield from entries
def _parse_sitemap(self, response: Response) -> Iterable[Request]:
if response.url.endswith("/robots.txt"):
for url in sitemap_urls_from_robots(response.text, base_url=response.url):
yield Request(url, callback=self._parse_sitemap)
else:
body = self._get_sitemap_body(response)
if body is None:
logger.warning(
"Ignoring invalid sitemap: %(response)s",
{"response": response},
extra={"spider": self},
)
return
s = Sitemap(body)
it = self.sitemap_filter(s)
if s.type == "sitemapindex":
for loc in iterloc(it, self.sitemap_alternate_links):
if any(x.search(loc) for x in self._follow):
yield Request(loc, callback=self._parse_sitemap)
elif s.type == "urlset":
for loc in iterloc(it, self.sitemap_alternate_links):
for r, c in self._cbs:
if r.search(loc):
yield Request(loc, callback=c)
break
def _get_sitemap_body(self, response: Response) -> bytes | None:
"""Return the sitemap body contained in the given response,
or None if the response is not a sitemap.
"""
if isinstance(response, XmlResponse):
return response.body
if gzip_magic_number(response):
uncompressed_size = len(response.body)
max_size = response.meta.get("download_maxsize", self._max_size)
warn_size = response.meta.get("download_warnsize", self._warn_size)
try:
body = gunzip(response.body, max_size=max_size)
except _DecompressionMaxSizeExceeded:
return None
if uncompressed_size < warn_size <= len(body):
logger.warning(
f"{response} body size after decompression ({len(body)} B) "
f"is larger than the download warning size ({warn_size} B)."
)
return body
# actual gzipped sitemap files are decompressed above ;
# if we are here (response body is not gzipped)
# and have a response for .xml.gz,
# it usually means that it was already gunzipped
# by HttpCompression middleware,
# the HTTP response being sent with "Content-Encoding: gzip"
# without actually being a .xml.gz file in the first place,
# merely XML gzip-compressed on the fly,
# in other word, here, we have plain XML
if response.url.endswith(".xml") or response.url.endswith(".xml.gz"):
return response.body
return None
def regex(x: re.Pattern[str] | str) -> re.Pattern[str]:
if isinstance(x, str):
return re.compile(x)
return x
def iterloc(it: Iterable[dict[str, Any]], alt: bool = False) -> Iterable[str]:
for d in it:
yield d["loc"]
# Also consider alternate URLs (xhtml:link rel="alternate")
if alt and "alternate" in d:
yield from d["alternate"]
|