1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
from pr2test.context_manager import skip_if_not_supported
from pr2test.marks import require_root
from pyroute2 import MPTCP
pytestmark = [require_root()]
def get_endpoints(mptcp):
return dict(
(
x.get_nested('MPTCP_PM_ATTR_ADDR', 'MPTCP_PM_ADDR_ATTR_ADDR4'),
x.get_nested('MPTCP_PM_ATTR_ADDR', 'MPTCP_PM_ADDR_ATTR_ID'),
)
for x in mptcp.endpoint('show')
)
def get_limits(mptcp):
return [
(
x.get_attr('MPTCP_PM_ATTR_SUBFLOWS'),
x.get_attr('MPTCP_PM_ATTR_RCV_ADD_ADDRS'),
)
for x in mptcp.limits('show')
][0]
@skip_if_not_supported
def test_enpoint_add_addr4(context):
with MPTCP() as mptcp:
ipaddrs = [context.new_ipaddr for _ in range(3)]
for ipaddr in ipaddrs:
mptcp.endpoint('add', addr=ipaddr)
mapping = get_endpoints(mptcp)
assert set(mapping) >= set(ipaddrs)
for ipaddr in ipaddrs:
mptcp.endpoint('del', addr=ipaddr, id=mapping[ipaddr])
assert not set(get_endpoints(mptcp)).intersection(set(ipaddrs))
@skip_if_not_supported
def test_limits(context):
with MPTCP() as mptcp:
save_subflows, save_rcv_add = get_limits(mptcp)
mptcp.limits('set', subflows=2, rcv_add_addrs=3)
assert get_limits(mptcp) == (2, 3)
mptcp.limits('set', subflows=save_subflows, rcv_add_addrs=save_rcv_add)
assert get_limits(mptcp) == (save_subflows, save_rcv_add)
|