File: test_basic.py

package info (click to toggle)
pyroute2 0.8.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 3,700 kB
  • sloc: python: 50,245; makefile: 280; javascript: 183; ansic: 81; sh: 44; awk: 17
file content (102 lines) | stat: -rw-r--r-- 3,193 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import errno

import pytest
from pr2test.context_manager import make_test_matrix
from pr2test.marks import require_root
from pr2test.tools import qdisc_exists

from pyroute2 import NetlinkError

pytestmark = [require_root()]
test_matrix = make_test_matrix(targets=['local', 'netns'])


@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_pfifo(context):
    index, ifname = context.default_interface
    context.ipr.tc('add', 'pfifo', index=index, limit=700)
    assert qdisc_exists(context.netns, 'pfifo', ifname=ifname, limit=700)


@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_pfifo_fast(context):
    index, ifname = context.default_interface
    context.ipr.tc('add', 'pfifo_fast', index=index, handle=0)
    ret = qdisc_exists(context.netns, 'pfifo_fast', ifname=ifname)[0]
    assert ret.get_attr('TCA_OPTIONS')['priomap']


@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_plug(context):
    index, ifname = context.default_interface
    context.ipr.tc('add', 'plug', index=index, limit=13107)
    assert qdisc_exists(context.netns, 'plug', ifname=ifname)


@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_blackhole(context):
    index, ifname = context.default_interface
    context.ipr.tc('add', 'blackhole', index=index)
    assert qdisc_exists(context.netns, 'blackhole', ifname=ifname)


@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_codel(context):
    index, ifname = context.default_interface
    context.ipr.tc(
        'add',
        'codel',
        index=index,
        handle='1:0',
        cdl_interval='40ms',
        cdl_target='2ms',
        cdl_limit=5000,
        cdl_ecn=1,
    )
    assert qdisc_exists(
        context.netns, 'codel', ifname=ifname, codel_ecn=1, codel_limit=5000
    )


@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_sfq(context):
    index, ifname = context.default_interface
    context.ipr.tc('add', 'sfq', index=index, handle=0, perturb=10)
    assert qdisc_exists(context.netns, 'sfq', ifname=ifname, perturb_period=10)


@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_tbf(context):
    index, ifname = context.default_interface
    context.ipr.tc(
        'add',
        'tbf',
        index=index,
        handle=0,
        rate='220kbit',
        latency='50ms',
        burst=1540,
    )
    opts = qdisc_exists(context.netns, 'tbf', ifname=ifname)[0].get_nested(
        'TCA_OPTIONS', 'TCA_TBF_PARMS'
    )
    assert opts['rate'] == 27500


@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_choke(context):
    index, ifname = context.default_interface
    try:
        context.ipr.tc(
            'add', 'choke', index=index, limit=5500, bandwith=3000, ecn=True
        )
    except NetlinkError as e:
        if e.code == errno.ENOENT:
            pytest.skip('qdisc not supported: choke')
        raise
    opts = qdisc_exists(context.netns, 'choke', ifname=ifname)[0].get_nested(
        'TCA_OPTIONS', 'TCA_CHOKE_PARMS'
    )
    assert opts['limit'] == 5500
    assert opts['qth_max'] == 1375
    assert opts['qth_min'] == 458