1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
"""Helper functions for working with signals"""
from __future__ import annotations
import logging
from collections.abc import Sequence
from typing import Any as TypingAny
from pydispatch.dispatcher import (
Anonymous,
Any,
disconnect,
getAllReceivers,
liveReceivers,
)
from pydispatch.robustapply import robustApply
from twisted.internet.defer import Deferred, DeferredList
from twisted.python.failure import Failure
from scrapy.exceptions import StopDownload
from scrapy.utils.defer import maybeDeferred_coro
from scrapy.utils.log import failure_to_exc_info
logger = logging.getLogger(__name__)
def send_catch_log(
signal: TypingAny = Any,
sender: TypingAny = Anonymous,
*arguments: TypingAny,
**named: TypingAny,
) -> list[tuple[TypingAny, TypingAny]]:
"""Like pydispatcher.robust.sendRobust but it also logs errors and returns
Failures instead of exceptions.
"""
dont_log = named.pop("dont_log", ())
dont_log = tuple(dont_log) if isinstance(dont_log, Sequence) else (dont_log,)
dont_log += (StopDownload,)
spider = named.get("spider")
responses: list[tuple[TypingAny, TypingAny]] = []
for receiver in liveReceivers(getAllReceivers(sender, signal)):
result: TypingAny
try:
response = robustApply(
receiver, signal=signal, sender=sender, *arguments, **named
)
if isinstance(response, Deferred):
logger.error(
"Cannot return deferreds from signal handler: %(receiver)s",
{"receiver": receiver},
extra={"spider": spider},
)
except dont_log:
result = Failure()
except Exception:
result = Failure()
logger.error(
"Error caught on signal handler: %(receiver)s",
{"receiver": receiver},
exc_info=True,
extra={"spider": spider},
)
else:
result = response
responses.append((receiver, result))
return responses
def send_catch_log_deferred(
signal: TypingAny = Any,
sender: TypingAny = Anonymous,
*arguments: TypingAny,
**named: TypingAny,
) -> Deferred[list[tuple[TypingAny, TypingAny]]]:
"""Like send_catch_log but supports returning deferreds on signal handlers.
Returns a deferred that gets fired once all signal handlers deferreds were
fired.
"""
def logerror(failure: Failure, recv: Any) -> Failure:
if dont_log is None or not isinstance(failure.value, dont_log):
logger.error(
"Error caught on signal handler: %(receiver)s",
{"receiver": recv},
exc_info=failure_to_exc_info(failure),
extra={"spider": spider},
)
return failure
dont_log = named.pop("dont_log", None)
spider = named.get("spider")
dfds: list[Deferred[tuple[TypingAny, TypingAny]]] = []
for receiver in liveReceivers(getAllReceivers(sender, signal)):
d: Deferred[TypingAny] = maybeDeferred_coro(
robustApply, receiver, signal=signal, sender=sender, *arguments, **named
)
d.addErrback(logerror, receiver)
# TODO https://pylint.readthedocs.io/en/latest/user_guide/messages/warning/cell-var-from-loop.html
d2: Deferred[tuple[TypingAny, TypingAny]] = d.addBoth(
lambda result: (
receiver, # pylint: disable=cell-var-from-loop # noqa: B023
result,
)
)
dfds.append(d2)
dl = DeferredList(dfds)
d3: Deferred[list[tuple[TypingAny, TypingAny]]] = dl.addCallback(
lambda out: [x[1] for x in out]
)
return d3
def disconnect_all(signal: TypingAny = Any, sender: TypingAny = Any) -> None:
"""Disconnect all signal handlers. Useful for cleaning up after running
tests
"""
for receiver in liveReceivers(getAllReceivers(sender, signal)):
disconnect(receiver, signal=signal, sender=sender)
|