File: test_utils_reactor.py

package info (click to toggle)
python-scrapy 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,308 kB
  • sloc: python: 55,321; xml: 199; makefile: 25; sh: 7
file content (36 lines) | stat: -rw-r--r-- 1,223 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import sys
import warnings

import pytest

from scrapy.utils.defer import deferred_f_from_coro_f
from scrapy.utils.reactor import (
    _asyncio_reactor_path,
    install_reactor,
    is_asyncio_reactor_installed,
    set_asyncio_event_loop,
)


class TestAsyncio:
    def test_is_asyncio_reactor_installed(self, reactor_pytest: str) -> None:
        # the result should depend only on the pytest --reactor argument
        assert is_asyncio_reactor_installed() == (reactor_pytest == "asyncio")

    @pytest.mark.skipif(sys.version_info >= (3, 14), reason="Additional warnings")
    def test_install_asyncio_reactor(self):
        from twisted.internet import reactor as original_reactor

        with warnings.catch_warnings(record=True) as w:
            install_reactor(_asyncio_reactor_path)
            assert len(w) == 0, [str(warning) for warning in w]
        from twisted.internet import reactor  # pylint: disable=reimported

        assert original_reactor == reactor

    @pytest.mark.only_asyncio
    @deferred_f_from_coro_f
    async def test_set_asyncio_event_loop(self):
        install_reactor(_asyncio_reactor_path)
        assert set_asyncio_event_loop(None) is asyncio.get_running_loop()