File: spider.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (131 lines) | stat: -rw-r--r-- 3,599 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from __future__ import annotations

import inspect
import logging
from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload

from scrapy.spiders import Spider
from scrapy.utils.defer import deferred_from_coro
from scrapy.utils.misc import arg_to_iter

if TYPE_CHECKING:
    from collections.abc import AsyncGenerator, Iterable
    from types import CoroutineType, ModuleType

    from twisted.internet.defer import Deferred

    from scrapy import Request
    from scrapy.spiderloader import SpiderLoaderProtocol


logger = logging.getLogger(__name__)

_T = TypeVar("_T")


# https://stackoverflow.com/questions/60222982
@overload
def iterate_spider_output(result: AsyncGenerator[_T]) -> AsyncGenerator[_T]: ...  # type: ignore[overload-overlap]


@overload
def iterate_spider_output(result: CoroutineType[Any, Any, _T]) -> Deferred[_T]: ...


@overload
def iterate_spider_output(result: _T) -> Iterable[Any]: ...


def iterate_spider_output(
    result: Any,
) -> Iterable[Any] | AsyncGenerator[_T] | Deferred[_T]:
    if inspect.isasyncgen(result):
        return result
    if inspect.iscoroutine(result):
        d = deferred_from_coro(result)
        d.addCallback(iterate_spider_output)
        return d
    return arg_to_iter(deferred_from_coro(result))


def iter_spider_classes(module: ModuleType) -> Iterable[type[Spider]]:
    """Return an iterator over all spider classes defined in the given module
    that can be instantiated (i.e. which have name)
    """
    for obj in vars(module).values():
        if (
            inspect.isclass(obj)
            and issubclass(obj, Spider)
            and obj.__module__ == module.__name__
            and getattr(obj, "name", None)
        ):
            yield obj


@overload
def spidercls_for_request(
    spider_loader: SpiderLoaderProtocol,
    request: Request,
    default_spidercls: type[Spider],
    log_none: bool = ...,
    log_multiple: bool = ...,
) -> type[Spider]: ...


@overload
def spidercls_for_request(
    spider_loader: SpiderLoaderProtocol,
    request: Request,
    default_spidercls: Literal[None],
    log_none: bool = ...,
    log_multiple: bool = ...,
) -> type[Spider] | None: ...


@overload
def spidercls_for_request(
    spider_loader: SpiderLoaderProtocol,
    request: Request,
    *,
    log_none: bool = ...,
    log_multiple: bool = ...,
) -> type[Spider] | None: ...


def spidercls_for_request(
    spider_loader: SpiderLoaderProtocol,
    request: Request,
    default_spidercls: type[Spider] | None = None,
    log_none: bool = False,
    log_multiple: bool = False,
) -> type[Spider] | None:
    """Return a spider class that handles the given Request.

    This will look for the spiders that can handle the given request (using
    the spider loader) and return a Spider class if (and only if) there is
    only one Spider able to handle the Request.

    If multiple spiders (or no spider) are found, it will return the
    default_spidercls passed. It can optionally log if multiple or no spiders
    are found.
    """
    snames = spider_loader.find_by_request(request)
    if len(snames) == 1:
        return spider_loader.load(snames[0])

    if len(snames) > 1 and log_multiple:
        logger.error(
            "More than one spider can handle: %(request)s - %(snames)s",
            {"request": request, "snames": ", ".join(snames)},
        )

    if len(snames) == 0 and log_none:
        logger.error(
            "Unable to find spider that handles: %(request)s", {"request": request}
        )

    return default_spidercls


class DefaultSpider(Spider):
    name = "default"