1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
import pytest
from pr2test.context_manager import make_test_matrix, skip_if_not_supported
from pr2test.marks import require_root
from pr2test.tools import qdisc_exists
from pyroute2 import protocols
pytestmark = [require_root()]
test_matrix = make_test_matrix(targets=['local', 'netns'])
@pytest.mark.parametrize('context', test_matrix, indirect=True)
@skip_if_not_supported
def test_htb_over_32gbit(context):
index, ifname = context.default_interface
# 8<-----------------------------------------------------
# root queue, '1:0' handle notation
context.ipr.tc('add', 'htb', index=index, handle='1:', default='20:0')
assert qdisc_exists(context.netns, 'htb', ifname=ifname)
# 8<-----------------------------------------------------
# classes, both string and int handle notation
context.ipr.tc(
'add-class',
'htb',
index=index,
handle=0x10001,
parent=0x10000,
rate='50gbit',
ceil='50gbit',
)
context.ipr.tc(
'add-class',
'htb',
index=index,
handle=0x10010,
parent=0x10001,
rate='35gbit',
ceil='35gbit',
prio=1,
)
context.ipr.tc(
'add-class',
'htb',
index=index,
handle=0x10020,
parent=0x10001,
rate='128kbit',
ceil='128kbit',
prio=2,
)
# 8<-----------------------------------------------------
# list the installed classes
classes = context.ipr.get_classes(index=index)
assert len(classes) == 3
rate0 = classes[0].get(('TCA_OPTIONS', 'TCA_HTB_RATE64'), 0)
ceil0 = classes[0].get(('TCA_OPTIONS', 'TCA_HTB_CEIL64'), 0)
rate1 = classes[1].get(('TCA_OPTIONS', 'TCA_HTB_RATE64'), 0)
ceil1 = classes[1].get(('TCA_OPTIONS', 'TCA_HTB_CEIL64'), 0)
rate2 = classes[2].get(('TCA_OPTIONS', 'TCA_HTB_RATE64'), 0)
ceil2 = classes[2].get(('TCA_OPTIONS', 'TCA_HTB_CEIL64'), 0)
assert rate0 == ceil0
assert rate1 == ceil1
assert rate2 == ceil2
assert rate0 > rate2
assert rate1 > rate2
assert rate2 == 0
@pytest.mark.parametrize('context', test_matrix, indirect=True)
@skip_if_not_supported
def test_htb(context):
index, ifname = context.default_interface
# 8<-----------------------------------------------------
# root queue, '1:0' handle notation
context.ipr.tc('add', 'htb', index=index, handle='1:', default='20:0')
assert qdisc_exists(context.netns, 'htb', ifname=ifname)
# 8<-----------------------------------------------------
# classes, both string and int handle notation
context.ipr.tc(
'add-class',
'htb',
index=index,
handle='1:1',
parent='1:0',
rate='256kbit',
burst=1024 * 6,
)
context.ipr.tc(
'add-class',
'htb',
index=index,
handle=0x10010,
parent=0x10001,
rate='192kbit',
burst=1024 * 6,
prio=1,
)
context.ipr.tc(
'add-class',
'htb',
index=index,
handle='1:20',
parent='1:1',
rate='128kbit',
burst=1024 * 6,
prio=2,
)
cls = context.ipr.get_classes(index=index)
assert len(cls) == 3
# 8<-----------------------------------------------------
# leaves, both string and int handle notation
context.ipr.tc(
'add', 'sfq', index=index, handle='10:', parent='1:10', perturb=10
)
context.ipr.tc(
'add', 'sfq', index=index, handle=0x200000, parent=0x10020, perturb=10
)
qds = [x for x in context.ipr.get_qdiscs() if x['index'] == index]
types = set([x.get_attr('TCA_KIND') for x in qds])
assert types == set(('htb', 'sfq'))
# 8<-----------------------------------------------------
# filters, both string and int handle notation
#
# Please note, that u32 filter requires ethernet protocol
# numbers, as defined in protocols module. Do not provide
# here socket.AF_INET and so on.
#
context.ipr.tc(
'add-filter',
'u32',
index=index,
handle='0:0',
parent='1:0',
prio=10,
protocol=protocols.ETH_P_IP,
target='1:10',
keys=['0x0006/0x00ff+8', '0x0000/0xffc0+2'],
)
context.ipr.tc(
'add-filter',
'u32',
index=index,
handle=0,
parent=0x10000,
prio=10,
protocol=protocols.ETH_P_IP,
target=0x10020,
keys=['0x5/0xf+0', '0x10/0xff+33'],
)
# 2 filters + 2 autogenerated
fls = context.ipr.get_filters(index=index)
assert len(fls) == 4
@pytest.mark.parametrize('context', test_matrix, indirect=True)
@skip_if_not_supported
def test_replace(context):
test_htb(context)
index, ifname = context.default_interface
# change class
context.ipr.tc(
'replace-class',
'htb',
index=index,
handle=0x10010,
parent=0x10001,
rate='102kbit',
burst=1024 * 6,
prio=3,
)
clss = context.ipr.get_classes(index=index)
for cls in clss:
if cls['handle'] == 0x10010:
break
else:
raise Exception('target class not found')
opts = cls.get_attr('TCA_OPTIONS')
params = opts.get_attr('TCA_HTB_PARMS')
assert params['prio'] == 3
assert params['quantum'] * 8 == 10200
|