1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
from __future__ import annotations
import inspect
import logging
from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload
from scrapy.spiders import Spider
from scrapy.utils.defer import deferred_from_coro
from scrapy.utils.misc import arg_to_iter
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable
from types import CoroutineType, ModuleType
from twisted.internet.defer import Deferred
from scrapy import Request
from scrapy.spiderloader import SpiderLoaderProtocol
logger = logging.getLogger(__name__)
_T = TypeVar("_T")
# https://stackoverflow.com/questions/60222982
@overload
def iterate_spider_output(result: AsyncGenerator[_T]) -> AsyncGenerator[_T]: ... # type: ignore[overload-overlap]
@overload
def iterate_spider_output(result: CoroutineType[Any, Any, _T]) -> Deferred[_T]: ...
@overload
def iterate_spider_output(result: _T) -> Iterable[Any]: ...
def iterate_spider_output(
result: Any,
) -> Iterable[Any] | AsyncGenerator[_T] | Deferred[_T]:
if inspect.isasyncgen(result):
return result
if inspect.iscoroutine(result):
d = deferred_from_coro(result)
d.addCallback(iterate_spider_output)
return d
return arg_to_iter(deferred_from_coro(result))
def iter_spider_classes(module: ModuleType) -> Iterable[type[Spider]]:
"""Return an iterator over all spider classes defined in the given module
that can be instantiated (i.e. which have name)
"""
for obj in vars(module).values():
if (
inspect.isclass(obj)
and issubclass(obj, Spider)
and obj.__module__ == module.__name__
and getattr(obj, "name", None)
):
yield obj
@overload
def spidercls_for_request(
spider_loader: SpiderLoaderProtocol,
request: Request,
default_spidercls: type[Spider],
log_none: bool = ...,
log_multiple: bool = ...,
) -> type[Spider]: ...
@overload
def spidercls_for_request(
spider_loader: SpiderLoaderProtocol,
request: Request,
default_spidercls: Literal[None],
log_none: bool = ...,
log_multiple: bool = ...,
) -> type[Spider] | None: ...
@overload
def spidercls_for_request(
spider_loader: SpiderLoaderProtocol,
request: Request,
*,
log_none: bool = ...,
log_multiple: bool = ...,
) -> type[Spider] | None: ...
def spidercls_for_request(
spider_loader: SpiderLoaderProtocol,
request: Request,
default_spidercls: type[Spider] | None = None,
log_none: bool = False,
log_multiple: bool = False,
) -> type[Spider] | None:
"""Return a spider class that handles the given Request.
This will look for the spiders that can handle the given request (using
the spider loader) and return a Spider class if (and only if) there is
only one Spider able to handle the Request.
If multiple spiders (or no spider) are found, it will return the
default_spidercls passed. It can optionally log if multiple or no spiders
are found.
"""
snames = spider_loader.find_by_request(request)
if len(snames) == 1:
return spider_loader.load(snames[0])
if len(snames) > 1 and log_multiple:
logger.error(
"More than one spider can handle: %(request)s - %(snames)s",
{"request": request, "snames": ", ".join(snames)},
)
if len(snames) == 0 and log_none:
logger.error(
"Unable to find spider that handles: %(request)s", {"request": request}
)
return default_spidercls
class DefaultSpider(Spider):
name = "default"
|