1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019 Mellanox Technologies, Inc. All rights reserved. See COPYING file
import unittest
import os
from tests.rdmacm_utils import CMSyncConnection, CMAsyncConnection
from tests.base import RDMATestCase, RDMACMBaseTest
from tests.utils import requires_mcast_support
import tests.irdma_base as irdma
from pyverbs.librdmacm_enums import rdma_port_space
import pyverbs.device as d
class CMTestCase(RDMACMBaseTest):
"""
RDMACM Test class. Include all the native RDMACM functionalities.
"""
@staticmethod
def get_port_space():
# IPoIB currently is not supported
return rdma_port_space.RDMA_PS_UDP
def test_rdmacm_sync_traffic(self):
self.two_nodes_rdmacm_traffic(CMSyncConnection, self.rdmacm_traffic)
def test_rdmacm_async_traffic(self):
# QP ack timeout formula: 4.096 * 2^(ack_timeout) [usec]
irdma.skip_if_irdma_dev(d.Context(name=self.dev_name))
self.two_nodes_rdmacm_traffic(CMAsyncConnection, self.rdmacm_traffic,
qp_timeout=21)
def test_rdmacm_async_reject_traffic(self):
self.two_nodes_rdmacm_traffic(CMAsyncConnection, self.rdmacm_traffic,
reject_conn=True)
@requires_mcast_support()
def test_rdmacm_async_multicast_traffic(self):
self.two_nodes_rdmacm_traffic(CMAsyncConnection,
self.rdmacm_multicast_traffic,
port_space=self.get_port_space())
@requires_mcast_support()
def test_rdmacm_async_ex_multicast_traffic(self):
self.two_nodes_rdmacm_traffic(CMAsyncConnection,
self.rdmacm_multicast_traffic,
port_space=self.get_port_space(), extended=True)
@requires_mcast_support()
def test_rdmacm_async_ex_leave_multicast_traffic(self):
self.two_nodes_rdmacm_traffic(CMAsyncConnection,
self.rdmacm_multicast_traffic,
port_space=self.get_port_space(), extended=True,
leave_test=True, bad_flow=True)
def test_rdmacm_async_traffic_external_qp(self):
self.two_nodes_rdmacm_traffic(CMAsyncConnection, self.rdmacm_traffic,
with_ext_qp=True)
def test_rdmacm_async_udp_traffic(self):
self.two_nodes_rdmacm_traffic(CMAsyncConnection, self.rdmacm_traffic,
port_space=self.get_port_space(), ib_port=self.ib_port)
def test_rdmacm_async_read(self):
self.two_nodes_rdmacm_traffic(CMAsyncConnection,
self.rdmacm_remote_traffic,
remote_op='read')
def test_rdmacm_async_write(self):
self.two_nodes_rdmacm_traffic(CMAsyncConnection,
self.rdmacm_remote_traffic,
remote_op='write')
|