1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
import csv
import json
from socket import AF_INET
import pytest
from pr2test.context_manager import make_test_matrix, skip_if_not_supported
from pr2test.marks import require_root
from pyroute2.common import basestring
from pyroute2.ndb.objects import RTNL_Object
from pyroute2.ndb.report import Record, RecordSet
pytestmark = [require_root()]
test_matrix = make_test_matrix(
targets=['local', 'netns'], dbs=['sqlite3/:memory:', 'postgres/pr2test']
)
@pytest.mark.parametrize(
'view,key,item',
(
('interfaces', 'ifname', 'lo'),
('routes', 'dst', '127.0.0.0/8'),
('addresses', 'address', '127.0.0.1/8'),
),
)
@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_contains(context, view, key, item):
context.ndb.interfaces['lo'].set('state', 'up').commit()
getattr(context.ndb, view).wait(**{key: item, 'timeout': 10})
assert item in getattr(context.ndb, view)
@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_types(context):
# check for the report type here
assert isinstance(context.ndb.interfaces.summary(), RecordSet)
# repr must be a string
assert isinstance(repr(context.ndb.interfaces.summary()), basestring)
@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_iter_keys(context):
for name in ('interfaces', 'addresses', 'neighbours', 'routes', 'rules'):
view = getattr(context.ndb, name)
for key in view:
assert isinstance(key, Record)
obj = view.get(key)
if obj is not None:
assert isinstance(obj, RTNL_Object)
@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_slices(context):
a = list(context.ndb.rules.dump())
ln = len(a) - 1
# simple indices
assert a[0] == context.ndb.rules.dump()[0]
assert a[1] == context.ndb.rules.dump()[1]
assert a[-1] == context.ndb.rules.dump()[-1]
assert context.ndb.rules.dump()[ln] == a[-1]
try:
context.ndb.rules.dump()[len(a)]
except IndexError:
pass
# slices
assert a[0:] == context.ndb.rules.dump()[0:]
assert a[:3] == context.ndb.rules.dump()[:3]
assert a[0:3] == context.ndb.rules.dump()[0:3]
assert a[1:3] == context.ndb.rules.dump()[1:3]
# negative slices
assert a[-3:] == context.ndb.rules.dump()[-3:]
assert a[-3:-1] == context.ndb.rules.dump()[-3:-1]
# mixed
assert a[-ln : ln - 1] == context.ndb.rules.dump()[-ln : ln - 1]
# step
assert a[2:ln:2] == context.ndb.rules.dump()[2:ln:2]
@pytest.mark.parametrize('context', test_matrix, indirect=True)
@skip_if_not_supported
def test_report_chains(context):
ipnet = str(context.ipnets[1].network)
ipaddr = context.new_ipaddr
router = context.new_ipaddr
ifname = context.new_ifname
(
context.ndb.interfaces.create(ifname=ifname, kind='dummy', state='up')
.add_ip(address=ipaddr, prefixlen=24)
.commit()
)
(
context.ndb.routes.create(
dst=ipnet,
dst_len=24,
gateway=router,
encap={'type': 'mpls', 'labels': [20, 30]},
).commit()
)
with context.ndb.routes.dump() as dump:
dump.select_records(oif=context.ndb.interfaces[ifname]['index'])
dump.select_records(lambda x: x.encap is not None)
dump.select_fields('encap')
for record in dump:
encap = json.loads(record.encap)
break
assert isinstance(encap, list)
assert encap[0]['label'] == 20
assert encap[0]['bos'] == 0
assert encap[1]['label'] == 30
assert encap[1]['bos'] == 1
@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_json(context):
data = json.loads(''.join(context.ndb.interfaces.summary().format('json')))
assert isinstance(data, list)
for row in data:
assert isinstance(row, dict)
class MD(csv.Dialect):
quotechar = "'"
doublequote = False
quoting = csv.QUOTE_MINIMAL
delimiter = ","
lineterminator = "\n"
@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_csv(context):
record_length = 0
for record in context.ndb.routes.dump():
if record_length == 0:
record_length = len(record)
else:
assert len(record) == record_length
reader = csv.reader(context.ndb.routes.dump().format('csv'), dialect=MD())
for record in reader:
assert len(record) == record_length
@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_nested_ipaddr(context):
ifname = context.new_ifname
ipaddr1 = context.new_ipaddr
ipaddr2 = context.new_ipaddr
with context.ndb.interfaces.create(
ifname=ifname, kind='dummy', state='up'
) as interface:
interface.add_ip(address=ipaddr1, prefixlen=24)
interface.add_ip(address=ipaddr2, prefixlen=24)
with context.ndb.interfaces[ifname].ipaddr.dump() as dump:
dump.select_records(lambda x: x.family == AF_INET)
assert len(repr(dump).split('\n')) == 2
@pytest.mark.parametrize('context', test_matrix, indirect=True)
def test_nested_ports(context):
ifbr0 = context.new_ifname
ifbr0p0 = context.new_ifname
ifbr0p1 = context.new_ifname
with context.ndb.interfaces as i:
i.create(ifname=ifbr0p0, kind='dummy').commit()
i.create(ifname=ifbr0p1, kind='dummy').commit()
(
i.create(ifname=ifbr0, kind='bridge')
.add_port(ifbr0p0)
.add_port(ifbr0p1)
.commit()
)
records = len(
repr(context.ndb.interfaces[ifbr0].ports.summary()).split('\n')
)
# 1 port
assert records == 2
|