File: conftest.py

package info (click to toggle)
python-aiojobs 1.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 412 kB
  • sloc: python: 1,342; makefile: 40
file content (35 lines) | stat: -rw-r--r-- 766 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
from collections.abc import AsyncIterator, Awaitable
from typing import Any, Callable, Dict

import pytest

from aiojobs import Scheduler

PARAMS: Dict[str, Any] = {
    "close_timeout": 1.0,
    "limit": 100,
    "pending_limit": 0,
    "exception_handler": None,
}


@pytest.fixture
async def scheduler() -> AsyncIterator[Scheduler]:
    scheduler = Scheduler(**PARAMS)
    yield scheduler
    await scheduler.close()


@pytest.fixture
async def make_scheduler() -> AsyncIterator[Callable[..., Awaitable[Scheduler]]]:
    schedulers = []

    async def maker(**kwargs: Any) -> Scheduler:
        ret = Scheduler(**kwargs)
        schedulers.append(ret)
        return ret

    yield maker

    await asyncio.gather(*(s.close() for s in schedulers))