1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
import asyncio
import pytest
from pydispatch import dispatcher
from testfixtures import LogCapture
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks
from twisted.python.failure import Failure
from scrapy.utils.asyncio import call_later
from scrapy.utils.defer import deferred_from_coro
from scrapy.utils.signal import (
send_catch_log,
send_catch_log_async,
send_catch_log_deferred,
)
from scrapy.utils.test import get_from_asyncio_queue
class TestSendCatchLog:
# whether the function being tested returns exceptions or failures
returns_exceptions: bool = False
@inlineCallbacks
def test_send_catch_log(self):
test_signal = object()
handlers_called = set()
dispatcher.connect(self.error_handler, signal=test_signal)
dispatcher.connect(self.ok_handler, signal=test_signal)
with LogCapture() as log:
result = yield defer.maybeDeferred(
self._get_result,
test_signal,
arg="test",
handlers_called=handlers_called,
)
assert self.error_handler in handlers_called
assert self.ok_handler in handlers_called
assert len(log.records) == 1
record = log.records[0]
assert "error_handler" in record.getMessage()
assert record.levelname == "ERROR"
assert result[0][0] == self.error_handler # pylint: disable=comparison-with-callable
assert isinstance(
result[0][1], Exception if self.returns_exceptions else Failure
)
assert result[1] == (self.ok_handler, "OK")
dispatcher.disconnect(self.error_handler, signal=test_signal)
dispatcher.disconnect(self.ok_handler, signal=test_signal)
def _get_result(self, signal, *a, **kw):
return send_catch_log(signal, *a, **kw)
def error_handler(self, arg, handlers_called):
handlers_called.add(self.error_handler)
1 / 0
def ok_handler(self, arg, handlers_called):
handlers_called.add(self.ok_handler)
assert arg == "test"
return "OK"
@pytest.mark.filterwarnings("ignore::scrapy.exceptions.ScrapyDeprecationWarning")
class TestSendCatchLogDeferred(TestSendCatchLog):
def _get_result(self, signal, *a, **kw):
return send_catch_log_deferred(signal, *a, **kw)
class TestSendCatchLogDeferred2(TestSendCatchLogDeferred):
def ok_handler(self, arg, handlers_called):
handlers_called.add(self.ok_handler)
assert arg == "test"
d = defer.Deferred()
call_later(0, d.callback, "OK")
return d
class TestSendCatchLogDeferredAsyncDef(TestSendCatchLogDeferred):
async def ok_handler(self, arg, handlers_called):
handlers_called.add(self.ok_handler)
assert arg == "test"
await defer.succeed(42)
return "OK"
@pytest.mark.only_asyncio
class TestSendCatchLogDeferredAsyncio(TestSendCatchLogDeferred):
async def ok_handler(self, arg, handlers_called):
handlers_called.add(self.ok_handler)
assert arg == "test"
await asyncio.sleep(0.2)
return await get_from_asyncio_queue("OK")
class TestSendCatchLogAsync(TestSendCatchLog):
returns_exceptions = True
def _get_result(self, signal, *a, **kw):
return deferred_from_coro(send_catch_log_async(signal, *a, **kw))
@pytest.mark.filterwarnings("ignore::scrapy.exceptions.ScrapyDeprecationWarning")
class TestSendCatchLogAsync2(TestSendCatchLogAsync):
def ok_handler(self, arg, handlers_called):
handlers_called.add(self.ok_handler)
assert arg == "test"
d = defer.Deferred()
call_later(0, d.callback, "OK")
return d
class TestSendCatchLogAsyncAsyncDef(TestSendCatchLogAsync):
async def ok_handler(self, arg, handlers_called):
handlers_called.add(self.ok_handler)
assert arg == "test"
await defer.succeed(42)
return "OK"
@pytest.mark.only_asyncio
class TestSendCatchLogAsyncAsyncio(TestSendCatchLogAsync):
async def ok_handler(self, arg, handlers_called):
handlers_called.add(self.ok_handler)
assert arg == "test"
await asyncio.sleep(0.2)
return await get_from_asyncio_queue("OK")
class TestSendCatchLog2:
def test_error_logged_if_deferred_not_supported(self):
def test_handler():
return defer.Deferred()
test_signal = object()
dispatcher.connect(test_handler, test_signal)
with LogCapture() as log:
send_catch_log(test_signal)
assert len(log.records) == 1
assert "Cannot return deferreds from signal handler" in str(log)
dispatcher.disconnect(test_handler, test_signal)
|