File: test_buffer.py

package info (click to toggle)
pyroute2 0.8.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 3,700 kB
  • sloc: python: 50,245; makefile: 280; javascript: 183; ansic: 81; sh: 44; awk: 17
file content (61 lines) | stat: -rw-r--r-- 1,892 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import pytest

from pyroute2.netlink.buffer import Buffer, Page

buffer_settings = (
    'mode,size,page_size',
    (('internal', 10485760, 32768), ('shared', 10485760, 32768)),
)


@pytest.mark.parametrize(*buffer_settings)
def test_create_buffer(mode, size, page_size):
    try:
        buffer = Buffer(mode, size, page_size)
    except ModuleNotFoundError:
        pytest.skip(f'buffer mode "{mode}" not supported')
    assert buffer.mode == mode
    assert buffer.size == size
    assert buffer.page_size == page_size
    minimal_index = 0
    maximal_index = size // page_size
    assert len(buffer.directory) == maximal_index
    for index, page in buffer.directory.items():
        assert minimal_index <= index <= maximal_index
        assert isinstance(page, Page)
        assert page.offset == index * page_size
        assert page.is_free is True
    buffer.close()


@pytest.mark.parametrize(*buffer_settings)
def test_use_all_pages(mode, size, page_size):
    try:
        buffer = Buffer(mode, size, page_size)
    except ModuleNotFoundError:
        pytest.skip(f'buffer mode "{mode}" not supported')
    maximal_index = size // page_size
    marker = 0x05
    for _ in range(maximal_index):
        page = buffer.get_free_page()
        assert not page.is_free
        page.view[0] = marker
        assert page.view[0] == marker
        assert buffer.view[page.offset] == marker
        assert buffer.buf[page.offset] == marker
        marker += 1
        if marker == 0xFF:
            marker = 0x05

    with pytest.raises(MemoryError):
        buffer.get_free_page()
    buffer.close()


@pytest.mark.parametrize(*buffer_settings)
def test_context_manager(mode, size, page_size):
    try:
        with Buffer(mode, size, page_size) as buffer:
            assert buffer.mode == mode
    except ModuleNotFoundError:
        pytest.skip(f'buffer mode "{mode}" not supported')