File: nsim.py

package info (click to toggle)
linux 6.12.43-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,676,260 kB
  • sloc: ansic: 25,921,022; asm: 269,579; sh: 136,424; python: 65,440; makefile: 55,721; perl: 37,752; xml: 19,284; cpp: 5,895; yacc: 4,927; lex: 2,939; awk: 1,594; sed: 28; ruby: 25
file content (135 lines) | stat: -rw-r--r-- 4,228 bytes parent folder | download | duplicates (18)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# SPDX-License-Identifier: GPL-2.0

import errno
import json
import os
import random
import re
import time
from .utils import cmd, ip


class NetdevSim:
    """
    Class for netdevsim netdevice and its attributes.
    """

    def __init__(self, nsimdev, port_index, ifname, ns=None):
        # In case udev renamed the netdev to according to new schema,
        # check if the name matches the port_index.
        nsimnamere = re.compile(r"eni\d+np(\d+)")
        match = nsimnamere.match(ifname)
        if match and int(match.groups()[0]) != port_index + 1:
            raise Exception("netdevice name mismatches the expected one")

        self.ifname = ifname
        self.nsimdev = nsimdev
        self.port_index = port_index
        self.ns = ns
        self.dfs_dir = "%s/ports/%u/" % (nsimdev.dfs_dir, port_index)
        ret = ip("-j link show dev %s" % ifname, ns=ns)
        self.dev = json.loads(ret.stdout)[0]
        self.ifindex = self.dev["ifindex"]

    def dfs_write(self, path, val):
        self.nsimdev.dfs_write(f'ports/{self.port_index}/' + path, val)


class NetdevSimDev:
    """
    Class for netdevsim bus device and its attributes.
    """
    @staticmethod
    def ctrl_write(path, val):
        fullpath = os.path.join("/sys/bus/netdevsim/", path)
        with open(fullpath, "w") as f:
            f.write(val)

    def dfs_write(self, path, val):
        fullpath = os.path.join(f"/sys/kernel/debug/netdevsim/netdevsim{self.addr}/", path)
        with open(fullpath, "w") as f:
            f.write(val)

    def __init__(self, port_count=1, queue_count=1, ns=None):
        # nsim will spawn in init_net, we'll set to actual ns once we switch it there
        self.ns = None

        if not os.path.exists("/sys/bus/netdevsim"):
            cmd("modprobe netdevsim")

        addr = random.randrange(1 << 15)
        while True:
            try:
                self.ctrl_write("new_device", "%u %u %u" % (addr, port_count, queue_count))
            except OSError as e:
                if e.errno == errno.ENOSPC:
                    addr = random.randrange(1 << 15)
                    continue
                raise e
            break
        self.addr = addr

        # As probe of netdevsim device might happen from a workqueue,
        # so wait here until all netdevs appear.
        self.wait_for_netdevs(port_count)

        if ns:
            cmd(f"devlink dev reload netdevsim/netdevsim{addr} netns {ns.name}")
            self.ns = ns

        cmd("udevadm settle", ns=self.ns)
        ifnames = self.get_ifnames()

        self.dfs_dir = "/sys/kernel/debug/netdevsim/netdevsim%u/" % addr

        self.nsims = []
        for port_index in range(port_count):
            self.nsims.append(self._make_port(port_index, ifnames[port_index]))

        self.removed = False

    def __enter__(self):
        return self

    def __exit__(self, ex_type, ex_value, ex_tb):
        """
        __exit__ gets called at the end of a "with" block.
        """
        self.remove()

    def _make_port(self, port_index, ifname):
        return NetdevSim(self, port_index, ifname, self.ns)

    def get_ifnames(self):
        ifnames = []
        listdir = cmd(f"ls /sys/bus/netdevsim/devices/netdevsim{self.addr}/net/",
                      ns=self.ns).stdout.split()
        for ifname in listdir:
            ifnames.append(ifname)
        ifnames.sort()
        return ifnames

    def wait_for_netdevs(self, port_count):
        timeout = 5
        timeout_start = time.time()

        while True:
            try:
                ifnames = self.get_ifnames()
            except FileNotFoundError as e:
                ifnames = []
            if len(ifnames) == port_count:
                break
            if time.time() < timeout_start + timeout:
                continue
            raise Exception("netdevices did not appear within timeout")

    def remove(self):
        if not self.removed:
            self.ctrl_write("del_device", "%u" % (self.addr, ))
            self.removed = True

    def remove_nsim(self, nsim):
        self.nsims.remove(nsim)
        self.ctrl_write("devices/netdevsim%u/del_port" % (self.addr, ),
                        "%u" % (nsim.port_index, ))