1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
from __future__ import annotations
import asyncio
import random
from typing import TYPE_CHECKING
from unittest import mock
import pytest
from twisted.internet.defer import Deferred
from scrapy.utils.asyncgen import as_async_generator
from scrapy.utils.asyncio import (
AsyncioLoopingCall,
_parallel_asyncio,
is_asyncio_available,
)
from scrapy.utils.defer import deferred_f_from_coro_f
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
class TestAsyncio:
def test_is_asyncio_available(self, reactor_pytest: str) -> None:
# the result should depend only on the pytest --reactor argument
assert is_asyncio_available() == (reactor_pytest == "asyncio")
@pytest.mark.only_asyncio
class TestParallelAsyncio:
"""Test for scrapy.utils.asyncio.parallel_asyncio(), based on tests.test_utils_defer.TestParallelAsync."""
CONCURRENT_ITEMS = 50
@staticmethod
async def callable(o: int, results: list[int]) -> None:
if random.random() < 0.4:
# simulate async processing
await asyncio.sleep(random.random() / 8)
# simulate trivial sync processing
results.append(o)
async def callable_wrapped(
self,
o: int,
results: list[int],
parallel_count: list[int],
max_parallel_count: list[int],
) -> None:
parallel_count[0] += 1
max_parallel_count[0] = max(max_parallel_count[0], parallel_count[0])
await self.callable(o, results)
assert parallel_count[0] > 0, parallel_count[0]
parallel_count[0] -= 1
@staticmethod
def get_async_iterable(length: int) -> AsyncGenerator[int, None]:
# simulate a simple callback without delays between results
return as_async_generator(range(length))
@staticmethod
async def get_async_iterable_with_delays(length: int) -> AsyncGenerator[int, None]:
# simulate a callback with delays between some of the results
for i in range(length):
if random.random() < 0.1:
await asyncio.sleep(random.random() / 20)
yield i
@deferred_f_from_coro_f
async def test_simple(self):
for length in [20, 50, 100]:
parallel_count = [0]
max_parallel_count = [0]
results = []
ait = self.get_async_iterable(length)
await _parallel_asyncio(
ait,
self.CONCURRENT_ITEMS,
self.callable_wrapped,
results,
parallel_count,
max_parallel_count,
)
assert list(range(length)) == sorted(results)
assert max_parallel_count[0] <= self.CONCURRENT_ITEMS
@deferred_f_from_coro_f
async def test_delays(self):
for length in [20, 50, 100]:
parallel_count = [0]
max_parallel_count = [0]
results = []
ait = self.get_async_iterable_with_delays(length)
await _parallel_asyncio(
ait,
self.CONCURRENT_ITEMS,
self.callable_wrapped,
results,
parallel_count,
max_parallel_count,
)
assert list(range(length)) == sorted(results)
assert max_parallel_count[0] <= self.CONCURRENT_ITEMS
@pytest.mark.only_asyncio
class TestAsyncioLoopingCall:
def test_looping_call(self):
func = mock.MagicMock()
looping_call = AsyncioLoopingCall(func)
looping_call.start(1, now=False)
assert looping_call.running
looping_call.stop()
assert not looping_call.running
assert not func.called
def test_looping_call_now(self):
func = mock.MagicMock()
looping_call = AsyncioLoopingCall(func)
looping_call.start(1)
looping_call.stop()
assert func.called
def test_looping_call_already_running(self):
looping_call = AsyncioLoopingCall(lambda: None)
looping_call.start(1)
with pytest.raises(RuntimeError):
looping_call.start(1)
looping_call.stop()
def test_looping_call_interval(self):
looping_call = AsyncioLoopingCall(lambda: None)
with pytest.raises(ValueError, match="Interval must be greater than 0"):
looping_call.start(0)
with pytest.raises(ValueError, match="Interval must be greater than 0"):
looping_call.start(-1)
assert not looping_call.running
def test_looping_call_bad_function(self):
looping_call = AsyncioLoopingCall(Deferred)
with pytest.raises(TypeError):
looping_call.start(0.1)
assert not looping_call.running
|