File: example_throttlers.py

package info (click to toggle)
python-throttler 1.2.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 180 kB
  • sloc: python: 473; makefile: 4; sh: 2
file content (32 lines) | stat: -rw-r--r-- 689 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import time

from throttler import Throttler, ThrottlerSimultaneous


async def task(throttler):
    async with throttler:
        return await asyncio.sleep(0.1)


async def many_tasks(throttler, count: int):
    coros = [task(throttler) for _ in range(count)]
    for coro in asyncio.as_completed(coros):
        _result = await coro
        print(f'Timestamp: {time.time()}')


def example():
    async def run():
        t = Throttler(rate_limit=10, period=3.0)
        await many_tasks(t, 100)

    asyncio.run(run())


def example_simultaneous():
    async def run():
        t = ThrottlerSimultaneous(count=5)
        await many_tasks(t, 100)

    asyncio.run(run())