File: test_utils_asyncio.py

package info (click to toggle)
python-scrapy 2.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,308 kB
  • sloc: python: 55,321; xml: 199; makefile: 25; sh: 7
file content (143 lines) | stat: -rw-r--r-- 4,725 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from __future__ import annotations

import asyncio
import random
from typing import TYPE_CHECKING
from unittest import mock

import pytest
from twisted.internet.defer import Deferred

from scrapy.utils.asyncgen import as_async_generator
from scrapy.utils.asyncio import (
    AsyncioLoopingCall,
    _parallel_asyncio,
    is_asyncio_available,
)
from scrapy.utils.defer import deferred_f_from_coro_f

if TYPE_CHECKING:
    from collections.abc import AsyncGenerator


class TestAsyncio:
    def test_is_asyncio_available(self, reactor_pytest: str) -> None:
        # the result should depend only on the pytest --reactor argument
        assert is_asyncio_available() == (reactor_pytest == "asyncio")


@pytest.mark.only_asyncio
class TestParallelAsyncio:
    """Test for scrapy.utils.asyncio.parallel_asyncio(), based on tests.test_utils_defer.TestParallelAsync."""

    CONCURRENT_ITEMS = 50

    @staticmethod
    async def callable(o: int, results: list[int]) -> None:
        if random.random() < 0.4:
            # simulate async processing
            await asyncio.sleep(random.random() / 8)
        # simulate trivial sync processing
        results.append(o)

    async def callable_wrapped(
        self,
        o: int,
        results: list[int],
        parallel_count: list[int],
        max_parallel_count: list[int],
    ) -> None:
        parallel_count[0] += 1
        max_parallel_count[0] = max(max_parallel_count[0], parallel_count[0])
        await self.callable(o, results)
        assert parallel_count[0] > 0, parallel_count[0]
        parallel_count[0] -= 1

    @staticmethod
    def get_async_iterable(length: int) -> AsyncGenerator[int, None]:
        # simulate a simple callback without delays between results
        return as_async_generator(range(length))

    @staticmethod
    async def get_async_iterable_with_delays(length: int) -> AsyncGenerator[int, None]:
        # simulate a callback with delays between some of the results
        for i in range(length):
            if random.random() < 0.1:
                await asyncio.sleep(random.random() / 20)
            yield i

    @deferred_f_from_coro_f
    async def test_simple(self):
        for length in [20, 50, 100]:
            parallel_count = [0]
            max_parallel_count = [0]
            results = []
            ait = self.get_async_iterable(length)
            await _parallel_asyncio(
                ait,
                self.CONCURRENT_ITEMS,
                self.callable_wrapped,
                results,
                parallel_count,
                max_parallel_count,
            )
            assert list(range(length)) == sorted(results)
            assert max_parallel_count[0] <= self.CONCURRENT_ITEMS

    @deferred_f_from_coro_f
    async def test_delays(self):
        for length in [20, 50, 100]:
            parallel_count = [0]
            max_parallel_count = [0]
            results = []
            ait = self.get_async_iterable_with_delays(length)
            await _parallel_asyncio(
                ait,
                self.CONCURRENT_ITEMS,
                self.callable_wrapped,
                results,
                parallel_count,
                max_parallel_count,
            )
            assert list(range(length)) == sorted(results)
            assert max_parallel_count[0] <= self.CONCURRENT_ITEMS


@pytest.mark.only_asyncio
class TestAsyncioLoopingCall:
    def test_looping_call(self):
        func = mock.MagicMock()
        looping_call = AsyncioLoopingCall(func)
        looping_call.start(1, now=False)
        assert looping_call.running
        looping_call.stop()
        assert not looping_call.running
        assert not func.called

    def test_looping_call_now(self):
        func = mock.MagicMock()
        looping_call = AsyncioLoopingCall(func)
        looping_call.start(1)
        looping_call.stop()
        assert func.called

    def test_looping_call_already_running(self):
        looping_call = AsyncioLoopingCall(lambda: None)
        looping_call.start(1)
        with pytest.raises(RuntimeError):
            looping_call.start(1)
        looping_call.stop()

    def test_looping_call_interval(self):
        looping_call = AsyncioLoopingCall(lambda: None)
        with pytest.raises(ValueError, match="Interval must be greater than 0"):
            looping_call.start(0)
        with pytest.raises(ValueError, match="Interval must be greater than 0"):
            looping_call.start(-1)
        assert not looping_call.running

    def test_looping_call_bad_function(self):
        looping_call = AsyncioLoopingCall(Deferred)
        with pytest.raises(TypeError):
            looping_call.start(0.1)
        assert not looping_call.running