File: test_throttler_simultaneous.py

package info (click to toggle)
python-throttler 1.2.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 180 kB
  • sloc: python: 473; makefile: 4; sh: 2
file content (21 lines) | stat: -rw-r--r-- 647 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

import pytest

from tests.service import ServiceSimultaneous
from throttler import throttle_simultaneous


class TestThrottlerSimultaneous:
    @pytest.mark.parametrize(
        ('max_simultaneous', 'count'), ((1, 10), (3, 10), (100, 500))
    )
    def test_via_service_simultaneous(self, max_simultaneous: int, count: int):
        s = ServiceSimultaneous(max_simultaneous)

        @throttle_simultaneous(max_simultaneous)
        async def request(value: float):
            return await s.get(value)

        main = asyncio.gather(*[request(v) for v in range(count)])
        asyncio.get_event_loop().run_until_complete(main)