1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
from pyroute2.iproute.linux import IPRoute
from pyroute2.nslink.nslink import NetNS
def interface_exists(netns=None, *argv, **kwarg):
ret = 0
ipr = None
if netns is None:
ipr = IPRoute()
else:
ipr = NetNS(netns)
spec = {}
spec.update(kwarg)
ret = list(ipr.link_lookup(*argv, **spec))
ipr.close()
return len(ret) >= 1
def address_exists(netns=None, **kwarg):
ret = 0
ipr = None
if netns is None:
ipr = IPRoute()
else:
ipr = NetNS(netns)
if 'match' in kwarg:
nkw = kwarg['match']
else:
nkw = dict(kwarg)
for key in ('address', 'local'):
if key in nkw:
nkw[key] = nkw[key].split('/')[0]
if 'ifname' in kwarg:
links = list(ipr.link_lookup(ifname=kwarg['ifname']))
if links:
nkw['index'] = links[0]
nkw.pop('ifname')
else:
ipr.close()
return 0
ret = list(ipr.addr('dump', match=nkw))
ipr.close()
return len(ret) == 1
def rtnl_object_exists(api, netns, record_filter):
ret = 0
ipr = None
if netns is None:
ipr = IPRoute()
else:
ipr = NetNS(netns)
ret = list(getattr(ipr, api)('dump', **record_filter))
ipr.close()
return len(ret) >= 1
def neighbour_exists(netns=None, **kwarg):
return rtnl_object_exists('neigh', netns, kwarg)
def route_exists(netns=None, **kwarg):
return rtnl_object_exists('route', netns, kwarg)
def rule_exists(netns=None, **kwarg):
return rtnl_object_exists('rule', netns, kwarg)
def fdb_record_exists(netns=None, **kwarg):
return rtnl_object_exists('fdb', netns, kwarg)
def qdisc_exists(netns=None, kind=None, **kwarg):
if netns is None:
ipr = IPRoute()
else:
ipr = NetNS(netns)
opts = {}
with ipr:
if 'ifname' in kwarg:
opts['index'] = ipr.link_lookup(ifname=kwarg.pop('ifname'))[0]
ret = list(ipr.get_qdiscs(**opts))
if kind is not None:
ret = [x for x in ret if x.get_attr('TCA_KIND') == kind]
if kwarg:
pre = ret
ret = []
for qdisc in pre:
options = qdisc.get_attr('TCA_OPTIONS')
if 'attrs' in options:
options = dict(options['attrs'])
for opt in kwarg:
if kwarg[opt] not in (
options.get(opt),
options.get(qdisc.name2nla(opt)),
):
break
else:
ret.append(qdisc)
return ret
|