File: test_addr.py

package info (click to toggle)
rdma-core 61.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 13,124 kB
  • sloc: ansic: 176,798; python: 15,496; sh: 2,742; perl: 1,465; makefile: 73
file content (86 lines) | stat: -rw-r--r-- 3,360 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019 Mellanox Technologies, Inc. All rights reserved.  See COPYING file

import unittest
import errno

from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError
from tests.base import PyverbsAPITestCase
from pyverbs.addr import AHAttr, AH
import pyverbs.device as d
from pyverbs.libibverbs_enums import ibv_port_state, IBV_LINK_LAYER_ETHERNET, IBV_LINK_LAYER_INFINIBAND
from pyverbs.pd import PD
import tests.utils as u


class AHTest(PyverbsAPITestCase):
    """
    Test various functionalities of the AH class.
    """
    def verify_link_layer_ether(self, ctx):
        """
        Aux function to verify link layer
        """
        link_layer = ctx.query_port(self.ib_port).link_layer
        if link_layer != IBV_LINK_LAYER_ETHERNET:
            raise unittest.SkipTest(f'Link layer of port={self.ib_port} is {d.translate_link_layer(link_layer)} , skip RoCE test')

    def verify_state(self, ctx):
        """
        Aux function to verify port state
        """
        state = ctx.query_port(self.ib_port).state
        if state != ibv_port_state.IBV_PORT_ACTIVE and state != ibv_port_state.IBV_PORT_INIT:
            raise unittest.SkipTest(f'Port {self.ib_port} is not up, can not create AH')

    def test_create_ah(self):
        """
        Test ibv_create_ah.
        """
        self.verify_state(self.ctx)
        gr = u.get_global_route(self.ctx, gid_index=self.gid_index, port_num=self.ib_port)
        port_attrs = self.ctx.query_port(self.ib_port)
        dlid = port_attrs.lid if port_attrs.link_layer == IBV_LINK_LAYER_INFINIBAND else 0
        ah_attr = AHAttr(dlid=dlid, gr=gr, is_global=1, port_num=self.ib_port)
        pd = PD(self.ctx)
        try:
            AH(pd, attr=ah_attr)
        except PyverbsRDMAError as ex:
            if ex.error_code == errno.EOPNOTSUPP:
                raise unittest.SkipTest('Create AH is not supported')
            raise ex

    def test_create_ah_roce(self):
        """
        Verify that AH can't be created without GRH in RoCE
        """
        self.verify_link_layer_ether(self.ctx)
        self.verify_state(self.ctx)
        pd = PD(self.ctx)
        ah_attr = AHAttr(is_global=0, port_num=self.ib_port)
        try:
            AH(pd, attr=ah_attr)
        except PyverbsRDMAError as ex:
            if ex.error_code == errno.EOPNOTSUPP:
                raise unittest.SkipTest('Create AH is not supported')
            assert 'Failed to create AH' in str(ex)
        else:
            raise PyverbsError(f'Successfully created a non-global AH on RoCE port={self.ib_port}')

    def test_destroy_ah(self):
        """
        Test ibv_destroy_ah.
        """
        self.verify_state(self.ctx)
        gr = u.get_global_route(self.ctx, gid_index=self.gid_index, port_num=self.ib_port)
        port_attrs = self.ctx.query_port(self.ib_port)
        dlid = port_attrs.lid if port_attrs.link_layer == IBV_LINK_LAYER_INFINIBAND else 0
        ah_attr = AHAttr(dlid=dlid, gr=gr, is_global=1, port_num=self.ib_port)
        pd = PD(self.ctx)
        try:
            with AH(pd, attr=ah_attr) as ah:
                ah.close()
        except PyverbsRDMAError as ex:
            if ex.error_code == errno.EOPNOTSUPP:
                raise unittest.SkipTest('Create AH is not supported')
            raise ex