File: xport_adapter_mgr.py

package info (click to toggle)
uhd 4.9.0.0%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 184,180 kB
  • sloc: cpp: 262,887; python: 112,011; ansic: 102,670; vhdl: 57,031; tcl: 19,924; xml: 8,581; makefile: 3,028; sh: 2,812; pascal: 230; javascript: 120; csh: 94; asm: 20; perl: 11
file content (99 lines) | stat: -rw-r--r-- 4,029 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#
# Copyright 2022 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
Transport adapter manager

This allows programming custom routes into the transport manager and checking
its capabilities.
"""

import netaddr
from usrp_mpm.sys_utils import net
from .xport_adapter_ctrl import XportAdapterCtrl

class XportAdapterMgr:
    """
    Transport adapter manager
    """
    def __init__(self, log, iface, uio_label):
        self.log = log.getChild(f'XportAdapterMgr@{iface}')
        self.iface = iface
        self._ta_ctrl = XportAdapterCtrl(uio_label)
        self.log.debug(
            "Transport adapter compat number: %s Capabilities: %s Node instance: %d",
            str(self._ta_ctrl.get_compat_num()),
            ', '.join(self.get_capabilities()) if self.get_capabilities() else 'none',
            self.get_xport_adapter_inst()
        )

    def get_xport_adapter_inst(self):
        """
        Return the instance of the transport adapter that is connected to a
        given interface.
        """
        return self._ta_ctrl.get_xport_adapter_inst()

    def get_capabilities(self):
        """
        Return transport adapter capabilities as a list. For a list of values,
        see XportAdapterCtrl.
        """
        return list(self._ta_ctrl.features)

    def add_remote_ep_route(self, epid, **kwargs):
        """
        Adds a route from this transport adapter to a CHDR streaming endpoint
        with endpoint ID epid.
        When CHDR packets with the given epid reach the transport adapter, they
        will be sent to the dest_addr/dest_port/dest_mac_addr destination.

        :param epid: The 16-bit endpoint ID value.
        :param dest_addr: A string representation of the destination IPv4 address
        :param dest_port: An integer representation of the destination port
        :param dest_mac_addr: A string representation of the destination MAC address.
                         May be left empty, in which case, we will do an ARP
                         lookup.
        :param stream_mode: The stream mode used for outgoing packets. This is
                            a string representation of the available streaming
                            modes in EthDispatcherCtrl.StreamModes (may be lower
                            case).
        :returns: An integer value identifying the transport adapter used
        """
        dest_addr = kwargs.get('dest_addr')
        dest_port = kwargs.get('dest_port')
        dest_mac_addr = kwargs.get('dest_mac_addr')
        stream_mode = kwargs.get('stream_mode', '')
        def map_stream_mode(mode):
            """
            Map a string-based stream mode value to an Enum-based one.
            """
            for available_mode in self._ta_ctrl.StreamModes:
                if mode.upper() == available_mode.name:
                    return available_mode
            assert False
            return None
        try:
            stream_mode = map_stream_mode(stream_mode)
        except:
            raise ValueError(f"Invalid stream mode provided: `{stream_mode}'")
        try:
            assert netaddr.IPAddress(dest_addr).version == 4
        except:
            raise ValueError(f"Invalid IPv4 destination address: {dest_addr}")
        if not dest_mac_addr:
            self.log.debug(f"Looking up MAC address for IP address {dest_addr}...")
            dest_mac_addr = net.get_mac_addr(dest_addr)
        if not dest_mac_addr:
            raise RuntimeError(f"Could not find MAC address for IP address {dest_addr}!")
        try:
            assert str(netaddr.EUI(dest_mac_addr))
        except:
            raise ValueError(f"Invalid MAC address: {dest_mac_addr}")
        # Inputs are good, poke the regs
        self.log.debug(
            f"Adding route for endpoint ID {epid} from interface {self.iface}...")
        self._ta_ctrl.add_remote_ep_route(epid, dest_addr, dest_port, dest_mac_addr, stream_mode)
        return self._ta_ctrl.get_xport_adapter_inst()