File: test_utils_signal.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (99 lines) | stat: -rw-r--r-- 3,342 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import asyncio

import pytest
from pydispatch import dispatcher
from testfixtures import LogCapture
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.trial import unittest

from scrapy.utils.signal import send_catch_log, send_catch_log_deferred
from scrapy.utils.test import get_from_asyncio_queue


class TestSendCatchLog(unittest.TestCase):
    @defer.inlineCallbacks
    def test_send_catch_log(self):
        test_signal = object()
        handlers_called = set()

        dispatcher.connect(self.error_handler, signal=test_signal)
        dispatcher.connect(self.ok_handler, signal=test_signal)
        with LogCapture() as log:
            result = yield defer.maybeDeferred(
                self._get_result,
                test_signal,
                arg="test",
                handlers_called=handlers_called,
            )

        assert self.error_handler in handlers_called
        assert self.ok_handler in handlers_called
        assert len(log.records) == 1
        record = log.records[0]
        assert "error_handler" in record.getMessage()
        assert record.levelname == "ERROR"
        assert result[0][0] == self.error_handler  # pylint: disable=comparison-with-callable
        assert isinstance(result[0][1], Failure)
        assert result[1] == (self.ok_handler, "OK")

        dispatcher.disconnect(self.error_handler, signal=test_signal)
        dispatcher.disconnect(self.ok_handler, signal=test_signal)

    def _get_result(self, signal, *a, **kw):
        return send_catch_log(signal, *a, **kw)

    def error_handler(self, arg, handlers_called):
        handlers_called.add(self.error_handler)
        1 / 0

    def ok_handler(self, arg, handlers_called):
        handlers_called.add(self.ok_handler)
        assert arg == "test"
        return "OK"


class SendCatchLogDeferredTest(TestSendCatchLog):
    def _get_result(self, signal, *a, **kw):
        return send_catch_log_deferred(signal, *a, **kw)


class SendCatchLogDeferredTest2(SendCatchLogDeferredTest):
    def ok_handler(self, arg, handlers_called):
        handlers_called.add(self.ok_handler)
        assert arg == "test"
        d = defer.Deferred()
        reactor.callLater(0, d.callback, "OK")
        return d


@pytest.mark.usefixtures("reactor_pytest")
class SendCatchLogDeferredAsyncDefTest(SendCatchLogDeferredTest):
    async def ok_handler(self, arg, handlers_called):
        handlers_called.add(self.ok_handler)
        assert arg == "test"
        await defer.succeed(42)
        return "OK"


@pytest.mark.only_asyncio
class SendCatchLogDeferredAsyncioTest(SendCatchLogDeferredTest):
    async def ok_handler(self, arg, handlers_called):
        handlers_called.add(self.ok_handler)
        assert arg == "test"
        await asyncio.sleep(0.2)
        return await get_from_asyncio_queue("OK")


class TestSendCatchLog2:
    def test_error_logged_if_deferred_not_supported(self):
        def test_handler():
            return defer.Deferred()

        test_signal = object()
        dispatcher.connect(test_handler, test_signal)
        with LogCapture() as log:
            send_catch_log(test_signal)
        assert len(log.records) == 1
        assert "Cannot return deferreds from signal handler" in str(log)
        dispatcher.disconnect(test_handler, test_signal)