File: test_cache_info.py

package info (click to toggle)
python-async-lru 2.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 208 kB
  • sloc: python: 904; makefile: 17
file content (36 lines) | stat: -rw-r--r-- 999 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
from typing import Callable

from async_lru import alru_cache


async def test_cache_info(check_lru: Callable[..., None]) -> None:
    @alru_cache(maxsize=4)
    async def coro(val: int) -> int:
        return val

    inputs = [1, 2, 3]
    coros = [coro(v) for v in inputs]
    ret = await asyncio.gather(*coros)
    assert ret == inputs
    check_lru(coro, hits=0, misses=3, cache=3, tasks=0, maxsize=4)

    coro.cache_clear()

    check_lru(coro, hits=0, misses=0, cache=0, tasks=0, maxsize=4)

    inputs = [1, 1, 1]
    coros = [coro(v) for v in inputs]
    ret = await asyncio.gather(*coros)
    assert ret == inputs
    check_lru(coro, hits=2, misses=1, cache=1, tasks=0, maxsize=4)

    coro.cache_clear()

    check_lru(coro, hits=0, misses=0, cache=0, tasks=0, maxsize=4)

    inputs = [1, 2, 3, 4] * 2
    coros = [coro(v) for v in inputs]
    ret = await asyncio.gather(*coros)
    assert ret == inputs
    check_lru(coro, hits=4, misses=4, cache=4, tasks=0, maxsize=4)