File: s3.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (101 lines) | stat: -rw-r--r-- 3,762 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from scrapy.core.downloader.handlers.http import HTTPDownloadHandler
from scrapy.exceptions import NotConfigured
from scrapy.utils.boto import is_botocore_available
from scrapy.utils.httpobj import urlparse_cached
from scrapy.utils.misc import build_from_crawler

if TYPE_CHECKING:
    from twisted.internet.defer import Deferred

    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy import Request, Spider
    from scrapy.crawler import Crawler
    from scrapy.http import Response
    from scrapy.settings import BaseSettings


class S3DownloadHandler:
    def __init__(
        self,
        settings: BaseSettings,
        *,
        crawler: Crawler,
        aws_access_key_id: str | None = None,
        aws_secret_access_key: str | None = None,
        aws_session_token: str | None = None,
        httpdownloadhandler: type[HTTPDownloadHandler] = HTTPDownloadHandler,
        **kw: Any,
    ):
        if not is_botocore_available():
            raise NotConfigured("missing botocore library")

        if not aws_access_key_id:
            aws_access_key_id = settings["AWS_ACCESS_KEY_ID"]
        if not aws_secret_access_key:
            aws_secret_access_key = settings["AWS_SECRET_ACCESS_KEY"]
        if not aws_session_token:
            aws_session_token = settings["AWS_SESSION_TOKEN"]

        # If no credentials could be found anywhere,
        # consider this an anonymous connection request by default;
        # unless 'anon' was set explicitly (True/False).
        anon = kw.get("anon")
        if anon is None and not aws_access_key_id and not aws_secret_access_key:
            kw["anon"] = True
        self.anon = kw.get("anon")

        self._signer = None
        import botocore.auth
        import botocore.credentials

        kw.pop("anon", None)
        if kw:
            raise TypeError(f"Unexpected keyword arguments: {kw}")
        if not self.anon:
            assert aws_access_key_id is not None
            assert aws_secret_access_key is not None
            SignerCls = botocore.auth.AUTH_TYPE_MAPS["s3"]
            # botocore.auth.BaseSigner doesn't have an __init__() with args, only subclasses do
            self._signer = SignerCls(  # type: ignore[call-arg]
                botocore.credentials.Credentials(
                    aws_access_key_id, aws_secret_access_key, aws_session_token
                )
            )

        _http_handler = build_from_crawler(
            httpdownloadhandler,
            crawler,
        )
        self._download_http = _http_handler.download_request

    @classmethod
    def from_crawler(cls, crawler: Crawler, **kwargs: Any) -> Self:
        return cls(crawler.settings, crawler=crawler, **kwargs)

    def download_request(self, request: Request, spider: Spider) -> Deferred[Response]:
        p = urlparse_cached(request)
        scheme = "https" if request.meta.get("is_secure") else "http"
        bucket = p.hostname
        path = p.path + "?" + p.query if p.query else p.path
        url = f"{scheme}://{bucket}.s3.amazonaws.com{path}"
        if self.anon:
            request = request.replace(url=url)
        else:
            import botocore.awsrequest

            awsrequest = botocore.awsrequest.AWSRequest(
                method=request.method,
                url=f"{scheme}://s3.amazonaws.com/{bucket}{path}",
                headers=request.headers.to_unicode_dict(),
                data=request.body,
            )
            assert self._signer
            self._signer.add_auth(awsrequest)
            request = request.replace(url=url, headers=awsrequest.headers.items())
        return self._download_http(request, spider)