File: test_open.py

package info (click to toggle)
aiofiles 25.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 324 kB
  • sloc: python: 1,791; makefile: 3; sh: 1
file content (69 lines) | stat: -rw-r--r-- 1,646 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import asyncio
from pathlib import Path

import pytest

from aiofiles.threadpool import open as aioopen

RESOURCES_DIR = Path(__file__).parent.parent / "resources"
TEST_FILE = RESOURCES_DIR / "test_file1.txt"
TEST_FILE_CONTENTS = "0123456789"


@pytest.mark.parametrize("mode", ["r", "rb"])
async def test_file_not_found(mode):
    filename = "non_existent"

    try:
        open(filename, mode=mode)
    except Exception as e:
        expected = e

    assert expected

    try:
        await aioopen(filename, mode=mode)
    except Exception as e:
        actual = e

    assert actual

    assert actual.errno == expected.errno
    assert str(actual) == str(expected)


async def test_file_async_context_aexit():
    async with aioopen(TEST_FILE) as fp:
        pass

    with pytest.raises(ValueError):
        line = await fp.read()

    async with aioopen(TEST_FILE) as fp:
        line = await fp.read()
        assert line == TEST_FILE_CONTENTS


async def test_filetask_async_context_aexit():
    async def _process_test_file(file_ctx, sleep_time: float = 1.0):
        nonlocal file_ref
        async with file_ctx as fp:
            file_ref = file_ctx._obj
            await asyncio.sleep(sleep_time)
            await fp.read()

    cancel_time, sleep_time = 0.1, 10
    assert cancel_time <= (sleep_time / 10)

    file_ref = None
    file_ctx = aioopen(TEST_FILE)

    task = asyncio.create_task(
        _process_test_file(file_ctx=file_ctx, sleep_time=sleep_time)
    )
    try:
        await asyncio.wait_for(task, timeout=cancel_time)
    except asyncio.TimeoutError:
        assert task.cancelled

    assert file_ref.closed