File: middleware.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (135 lines) | stat: -rw-r--r-- 4,843 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from __future__ import annotations

import logging
import pprint
import warnings
from collections import defaultdict, deque
from typing import TYPE_CHECKING, Any, TypeVar, cast

from scrapy.exceptions import NotConfigured, ScrapyDeprecationWarning
from scrapy.utils.defer import process_chain, process_parallel
from scrapy.utils.misc import build_from_crawler, load_object

if TYPE_CHECKING:
    from collections.abc import Callable, Iterable

    from twisted.internet.defer import Deferred

    # typing.Concatenate and typing.ParamSpec require Python 3.10
    # typing.Self requires Python 3.11
    from typing_extensions import Concatenate, ParamSpec, Self

    from scrapy import Spider
    from scrapy.crawler import Crawler
    from scrapy.settings import BaseSettings, Settings

    _P = ParamSpec("_P")


logger = logging.getLogger(__name__)

_T = TypeVar("_T")
_T2 = TypeVar("_T2")


class MiddlewareManager:
    """Base class for implementing middleware managers"""

    component_name = "foo middleware"

    def __init__(self, *middlewares: Any) -> None:
        self.middlewares = middlewares
        # Only process_spider_output and process_spider_exception can be None.
        # Only process_spider_output can be a tuple, and only until _async compatibility methods are removed.
        self.methods: dict[str, deque[Callable | tuple[Callable, Callable] | None]] = (
            defaultdict(deque)
        )
        for mw in middlewares:
            self._add_middleware(mw)

    @classmethod
    def _get_mwlist_from_settings(cls, settings: Settings) -> list[Any]:
        raise NotImplementedError

    @staticmethod
    def _build_from_settings(objcls: type[_T], settings: BaseSettings) -> _T:
        if hasattr(objcls, "from_settings"):
            instance = objcls.from_settings(settings)  # type: ignore[attr-defined]
            method_name = "from_settings"
        else:
            instance = objcls()
            method_name = "__new__"
        if instance is None:
            raise TypeError(f"{objcls.__qualname__}.{method_name} returned None")
        return cast(_T, instance)

    @classmethod
    def from_settings(cls, settings: Settings, crawler: Crawler | None = None) -> Self:
        warnings.warn(
            f"{cls.__name__}.from_settings() is deprecated, use from_crawler() instead.",
            category=ScrapyDeprecationWarning,
            stacklevel=2,
        )
        return cls._from_settings(settings, crawler)

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> Self:
        return cls._from_settings(crawler.settings, crawler)

    @classmethod
    def _from_settings(cls, settings: Settings, crawler: Crawler | None = None) -> Self:
        mwlist = cls._get_mwlist_from_settings(settings)
        middlewares = []
        enabled = []
        for clspath in mwlist:
            try:
                mwcls = load_object(clspath)
                if crawler is not None:
                    mw = build_from_crawler(mwcls, crawler)
                else:
                    mw = MiddlewareManager._build_from_settings(mwcls, settings)
                middlewares.append(mw)
                enabled.append(clspath)
            except NotConfigured as e:
                if e.args:
                    logger.warning(
                        "Disabled %(clspath)s: %(eargs)s",
                        {"clspath": clspath, "eargs": e.args[0]},
                        extra={"crawler": crawler},
                    )

        logger.info(
            "Enabled %(componentname)ss:\n%(enabledlist)s",
            {
                "componentname": cls.component_name,
                "enabledlist": pprint.pformat(enabled),
            },
            extra={"crawler": crawler},
        )
        return cls(*middlewares)

    def _add_middleware(self, mw: Any) -> None:
        if hasattr(mw, "open_spider"):
            self.methods["open_spider"].append(mw.open_spider)
        if hasattr(mw, "close_spider"):
            self.methods["close_spider"].appendleft(mw.close_spider)

    def _process_parallel(
        self, methodname: str, obj: _T, *args: Any
    ) -> Deferred[list[_T2]]:
        methods = cast(
            "Iterable[Callable[Concatenate[_T, _P], _T2]]", self.methods[methodname]
        )
        return process_parallel(methods, obj, *args)

    def _process_chain(self, methodname: str, obj: _T, *args: Any) -> Deferred[_T]:
        methods = cast(
            "Iterable[Callable[Concatenate[_T, _P], _T]]", self.methods[methodname]
        )
        return process_chain(methods, obj, *args)

    def open_spider(self, spider: Spider) -> Deferred[list[None]]:
        return self._process_parallel("open_spider", spider)

    def close_spider(self, spider: Spider) -> Deferred[list[None]]:
        return self._process_parallel("close_spider", spider)