1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
from __future__ import annotations
import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from tldextract import TLDExtract
from scrapy.exceptions import NotConfigured
from scrapy.http import Response
from scrapy.http.cookies import CookieJar
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.python import to_unicode
if TYPE_CHECKING:
from collections.abc import Iterable, Sequence
from http.cookiejar import Cookie
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy import Request, Spider
from scrapy.crawler import Crawler
from scrapy.http.request import VerboseCookie
logger = logging.getLogger(__name__)
_split_domain = TLDExtract(include_psl_private_domains=True)
_UNSET = object()
def _is_public_domain(domain: str) -> bool:
parts = _split_domain(domain)
return not parts.domain
class CookiesMiddleware:
"""This middleware enables working with sites that need cookies"""
def __init__(self, debug: bool = False):
self.jars: defaultdict[Any, CookieJar] = defaultdict(CookieJar)
self.debug: bool = debug
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
if not crawler.settings.getbool("COOKIES_ENABLED"):
raise NotConfigured
return cls(crawler.settings.getbool("COOKIES_DEBUG"))
def _process_cookies(
self, cookies: Iterable[Cookie], *, jar: CookieJar, request: Request
) -> None:
for cookie in cookies:
cookie_domain = cookie.domain
cookie_domain = cookie_domain.removeprefix(".")
hostname = urlparse_cached(request).hostname
assert hostname is not None
request_domain = hostname.lower()
if cookie_domain and _is_public_domain(cookie_domain):
if cookie_domain != request_domain:
continue
cookie.domain = request_domain
jar.set_cookie_if_ok(cookie, request)
def process_request(
self, request: Request, spider: Spider
) -> Request | Response | None:
if request.meta.get("dont_merge_cookies", False):
return None
cookiejarkey = request.meta.get("cookiejar")
jar = self.jars[cookiejarkey]
cookies = self._get_request_cookies(jar, request)
self._process_cookies(cookies, jar=jar, request=request)
# set Cookie header
request.headers.pop("Cookie", None)
jar.add_cookie_header(request)
self._debug_cookie(request, spider)
return None
def process_response(
self, request: Request, response: Response, spider: Spider
) -> Request | Response:
if request.meta.get("dont_merge_cookies", False):
return response
# extract cookies from Set-Cookie and drop invalid/expired cookies
cookiejarkey = request.meta.get("cookiejar")
jar = self.jars[cookiejarkey]
cookies = jar.make_cookies(response, request)
self._process_cookies(cookies, jar=jar, request=request)
self._debug_set_cookie(response, spider)
return response
def _debug_cookie(self, request: Request, spider: Spider) -> None:
if self.debug:
cl = [
to_unicode(c, errors="replace")
for c in request.headers.getlist("Cookie")
]
if cl:
cookies = "\n".join(f"Cookie: {c}\n" for c in cl)
msg = f"Sending cookies to: {request}\n{cookies}"
logger.debug(msg, extra={"spider": spider})
def _debug_set_cookie(self, response: Response, spider: Spider) -> None:
if self.debug:
cl = [
to_unicode(c, errors="replace")
for c in response.headers.getlist("Set-Cookie")
]
if cl:
cookies = "\n".join(f"Set-Cookie: {c}\n" for c in cl)
msg = f"Received cookies from: {response}\n{cookies}"
logger.debug(msg, extra={"spider": spider})
def _format_cookie(self, cookie: VerboseCookie, request: Request) -> str | None:
"""
Given a dict consisting of cookie components, return its string representation.
Decode from bytes if necessary.
"""
decoded = {}
flags = set()
for key in ("name", "value", "path", "domain"):
value = cookie.get(key)
if value is None:
if key in ("name", "value"):
msg = f"Invalid cookie found in request {request}: {cookie} ('{key}' is missing)"
logger.warning(msg)
return None
continue
if isinstance(value, (bool, float, int, str)):
decoded[key] = str(value)
else:
assert isinstance(value, bytes)
try:
decoded[key] = value.decode("utf8")
except UnicodeDecodeError:
logger.warning(
"Non UTF-8 encoded cookie found in request %s: %s",
request,
cookie,
)
decoded[key] = value.decode("latin1", errors="replace")
for flag in ("secure",):
value = cookie.get(flag, _UNSET)
if value is _UNSET or not value:
continue
flags.add(flag)
cookie_str = f"{decoded.pop('name')}={decoded.pop('value')}"
for key, value in decoded.items(): # path, domain
cookie_str += f"; {key.capitalize()}={value}"
for flag in flags: # secure
cookie_str += f"; {flag.capitalize()}"
return cookie_str
def _get_request_cookies(
self, jar: CookieJar, request: Request
) -> Sequence[Cookie]:
"""
Extract cookies from the Request.cookies attribute
"""
if not request.cookies:
return []
cookies: Iterable[VerboseCookie]
if isinstance(request.cookies, dict):
cookies = tuple({"name": k, "value": v} for k, v in request.cookies.items())
else:
cookies = request.cookies
for cookie in cookies:
cookie.setdefault("secure", urlparse_cached(request).scheme == "https")
formatted = filter(None, (self._format_cookie(c, request) for c in cookies))
response = Response(request.url, headers={"Set-Cookie": formatted})
return jar.make_cookies(response, request)
|