1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
|
"""
This module implements the TextResponse class which adds encoding handling and
discovering (through HTTP headers) to base Response class.
See documentation in docs/topics/request-response.rst
"""
from __future__ import annotations
import json
from contextlib import suppress
from typing import TYPE_CHECKING, Any, AnyStr, cast
from urllib.parse import urljoin
import parsel
from w3lib.encoding import (
html_body_declared_encoding,
html_to_unicode,
http_content_type_encoding,
read_bom,
resolve_encoding,
)
from w3lib.html import strip_html5_whitespace
from scrapy.http.response import Response
from scrapy.utils.python import memoizemethod_noargs, to_unicode
from scrapy.utils.response import get_base_url
if TYPE_CHECKING:
from collections.abc import Callable, Iterable, Mapping
from twisted.python.failure import Failure
from scrapy.http.request import CallbackT, CookiesT, Request
from scrapy.link import Link
from scrapy.selector import Selector, SelectorList
_NONE = object()
class TextResponse(Response):
_DEFAULT_ENCODING = "ascii"
_cached_decoded_json = _NONE
attributes: tuple[str, ...] = (*Response.attributes, "encoding")
def __init__(self, *args: Any, **kwargs: Any):
self._encoding: str | None = kwargs.pop("encoding", None)
self._cached_benc: str | None = None
self._cached_ubody: str | None = None
self._cached_selector: Selector | None = None
super().__init__(*args, **kwargs)
def _set_body(self, body: str | bytes | None) -> None:
self._body: bytes = b"" # used by encoding detection
if isinstance(body, str):
if self._encoding is None:
raise TypeError(
"Cannot convert unicode body - "
f"{type(self).__name__} has no encoding"
)
self._body = body.encode(self._encoding)
else:
super()._set_body(body)
@property
def encoding(self) -> str:
return self._declared_encoding() or self._body_inferred_encoding()
def _declared_encoding(self) -> str | None:
return (
self._encoding
or self._bom_encoding()
or self._headers_encoding()
or self._body_declared_encoding()
)
def json(self) -> Any:
"""
.. versionadded:: 2.2
Deserialize a JSON document to a Python object.
"""
if self._cached_decoded_json is _NONE:
self._cached_decoded_json = json.loads(self.body)
return self._cached_decoded_json
@property
def text(self) -> str:
"""Body as unicode"""
# access self.encoding before _cached_ubody to make sure
# _body_inferred_encoding is called
benc = self.encoding
if self._cached_ubody is None:
charset = f"charset={benc}"
self._cached_ubody = html_to_unicode(charset, self.body)[1]
return self._cached_ubody
def urljoin(self, url: str) -> str:
"""Join this Response's url with a possible relative url to form an
absolute interpretation of the latter."""
return urljoin(get_base_url(self), url)
@memoizemethod_noargs
def _headers_encoding(self) -> str | None:
content_type = cast(bytes, self.headers.get(b"Content-Type", b""))
return http_content_type_encoding(to_unicode(content_type, encoding="latin-1"))
def _body_inferred_encoding(self) -> str:
if self._cached_benc is None:
content_type = to_unicode(
cast(bytes, self.headers.get(b"Content-Type", b"")), encoding="latin-1"
)
benc, ubody = html_to_unicode(
content_type,
self.body,
auto_detect_fun=self._auto_detect_fun,
default_encoding=self._DEFAULT_ENCODING,
)
self._cached_benc = benc
self._cached_ubody = ubody
return self._cached_benc
def _auto_detect_fun(self, text: bytes) -> str | None:
for enc in (self._DEFAULT_ENCODING, "utf-8", "cp1252"):
try:
text.decode(enc)
except UnicodeError:
continue
return resolve_encoding(enc)
return None
@memoizemethod_noargs
def _body_declared_encoding(self) -> str | None:
return html_body_declared_encoding(self.body)
@memoizemethod_noargs
def _bom_encoding(self) -> str | None:
return read_bom(self.body)[0]
@property
def selector(self) -> Selector:
from scrapy.selector import Selector
if self._cached_selector is None:
self._cached_selector = Selector(self)
return self._cached_selector
def jmespath(self, query: str, **kwargs: Any) -> SelectorList:
from scrapy.selector import SelectorList
if not hasattr(self.selector, "jmespath"):
raise AttributeError(
"Please install parsel >= 1.8.1 to get jmespath support"
)
return cast(SelectorList, self.selector.jmespath(query, **kwargs))
def xpath(self, query: str, **kwargs: Any) -> SelectorList:
from scrapy.selector import SelectorList
return cast(SelectorList, self.selector.xpath(query, **kwargs))
def css(self, query: str) -> SelectorList:
from scrapy.selector import SelectorList
return cast(SelectorList, self.selector.css(query))
def follow(
self,
url: str | Link | parsel.Selector,
callback: CallbackT | None = None,
method: str = "GET",
headers: Mapping[AnyStr, Any] | Iterable[tuple[AnyStr, Any]] | None = None,
body: bytes | str | None = None,
cookies: CookiesT | None = None,
meta: dict[str, Any] | None = None,
encoding: str | None = None,
priority: int = 0,
dont_filter: bool = False,
errback: Callable[[Failure], Any] | None = None,
cb_kwargs: dict[str, Any] | None = None,
flags: list[str] | None = None,
) -> Request:
"""
Return a :class:`~.Request` instance to follow a link ``url``.
It accepts the same arguments as ``Request.__init__()`` method,
but ``url`` can be not only an absolute URL, but also
* a relative URL
* a :class:`~scrapy.link.Link` object, e.g. the result of
:ref:`topics-link-extractors`
* a :class:`~scrapy.Selector` object for a ``<link>`` or ``<a>`` element, e.g.
``response.css('a.my_link')[0]``
* an attribute :class:`~scrapy.Selector` (not SelectorList), e.g.
``response.css('a::attr(href)')[0]`` or
``response.xpath('//img/@src')[0]``
See :ref:`response-follow-example` for usage examples.
"""
if isinstance(url, parsel.Selector):
url = _url_from_selector(url)
elif isinstance(url, parsel.SelectorList):
raise ValueError("SelectorList is not supported")
encoding = self.encoding if encoding is None else encoding
return super().follow(
url=url,
callback=callback,
method=method,
headers=headers,
body=body,
cookies=cookies,
meta=meta,
encoding=encoding,
priority=priority,
dont_filter=dont_filter,
errback=errback,
cb_kwargs=cb_kwargs,
flags=flags,
)
def follow_all(
self,
urls: Iterable[str | Link] | parsel.SelectorList | None = None,
callback: CallbackT | None = None,
method: str = "GET",
headers: Mapping[AnyStr, Any] | Iterable[tuple[AnyStr, Any]] | None = None,
body: bytes | str | None = None,
cookies: CookiesT | None = None,
meta: dict[str, Any] | None = None,
encoding: str | None = None,
priority: int = 0,
dont_filter: bool = False,
errback: Callable[[Failure], Any] | None = None,
cb_kwargs: dict[str, Any] | None = None,
flags: list[str] | None = None,
css: str | None = None,
xpath: str | None = None,
) -> Iterable[Request]:
"""
A generator that produces :class:`~.Request` instances to follow all
links in ``urls``. It accepts the same arguments as the :class:`~.Request`'s
``__init__()`` method, except that each ``urls`` element does not need to be
an absolute URL, it can be any of the following:
* a relative URL
* a :class:`~scrapy.link.Link` object, e.g. the result of
:ref:`topics-link-extractors`
* a :class:`~scrapy.Selector` object for a ``<link>`` or ``<a>`` element, e.g.
``response.css('a.my_link')[0]``
* an attribute :class:`~scrapy.Selector` (not SelectorList), e.g.
``response.css('a::attr(href)')[0]`` or
``response.xpath('//img/@src')[0]``
In addition, ``css`` and ``xpath`` arguments are accepted to perform the link extraction
within the ``follow_all()`` method (only one of ``urls``, ``css`` and ``xpath`` is accepted).
Note that when passing a ``SelectorList`` as argument for the ``urls`` parameter or
using the ``css`` or ``xpath`` parameters, this method will not produce requests for
selectors from which links cannot be obtained (for instance, anchor tags without an
``href`` attribute)
"""
arguments = [x for x in (urls, css, xpath) if x is not None]
if len(arguments) != 1:
raise ValueError(
"Please supply exactly one of the following arguments: urls, css, xpath"
)
if not urls:
if css:
urls = self.css(css)
if xpath:
urls = self.xpath(xpath)
if isinstance(urls, parsel.SelectorList):
selectors = urls
urls = []
for sel in selectors:
with suppress(_InvalidSelector):
urls.append(_url_from_selector(sel))
return super().follow_all(
urls=cast("Iterable[str | Link]", urls),
callback=callback,
method=method,
headers=headers,
body=body,
cookies=cookies,
meta=meta,
encoding=encoding,
priority=priority,
dont_filter=dont_filter,
errback=errback,
cb_kwargs=cb_kwargs,
flags=flags,
)
class _InvalidSelector(ValueError):
"""
Raised when a URL cannot be obtained from a Selector
"""
def _url_from_selector(sel: parsel.Selector) -> str:
if isinstance(sel.root, str):
# e.g. ::attr(href) result
return strip_html5_whitespace(sel.root)
if not hasattr(sel.root, "tag"):
raise _InvalidSelector(f"Unsupported selector: {sel}")
if sel.root.tag not in ("a", "link"):
raise _InvalidSelector(
f"Only <a> and <link> elements are supported; got <{sel.root.tag}>"
)
href = sel.root.get("href")
if href is None:
raise _InvalidSelector(f"<{sel.root.tag}> element has no href attribute: {sel}")
return strip_html5_whitespace(href)
|