1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
import unittest
import datetime
import errno
from tests.base import RCResources, RDMATestCase, PyverbsAPITestCase
from pyverbs.providers.mlx5.mlx5dv import Mlx5Context
from pyverbs.pyverbs_error import PyverbsRDMAError
from pyverbs.cq import CqInitAttrEx, CQEX
from tests.test_flow import FlowRes
from pyverbs.qp import QPInitAttr
from pyverbs.cq import PollCqAttr
from pyverbs.libibverbs_enums import ibv_qp_type, ibv_wc_status, ibv_wr_opcode, ibv_create_cq_wc_flags, \
IBV_WC_STANDARD_FLAGS
import tests.utils as u
GIGA = 1000000000
def convert_ts_to_ns(ctx, device_ts):
"""
Convert device timestamp from HCA core clock units to corresponding
nanosecond counts.
:param ctx: The context that gets this timestamp.
:param device_ts: The device timestamp to translate.
:return: Timestamp in nanoseconds
"""
try:
timestamp_in_ns = Mlx5Context.device_timestamp_to_ns(ctx, device_ts)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Converting timestamp to nanoseconds is not supported')
raise ex
return timestamp_in_ns
def timestamp_res_cls(base_class):
"""
This is a factory function which creates a class that inherits base_class of
any BaseResources type.
:param base_class: The base resources class to inherit from.
:return: TimeStampRes class.
"""
class TimeStampRes(base_class):
def __init__(self, dev_name, ib_port, gid_index, qp_type, send_ts=None,
recv_ts=None):
self.qp_type = qp_type
self.send_ts = send_ts
self.recv_ts = recv_ts
self.timestamp = None
self.scq = None
self.rcq = None
super().__init__(dev_name=dev_name, ib_port=ib_port, gid_index=gid_index)
def create_cq(self):
self.scq = self._create_ex_cq(self.send_ts)
self.rcq = self._create_ex_cq(self.recv_ts)
def _create_ex_cq(self, timestamp=None):
"""
Create an Extended CQ.
:param timestamp: If set, the timestamp type to use.
"""
wc_flags = IBV_WC_STANDARD_FLAGS
if timestamp:
wc_flags |= timestamp
cia = CqInitAttrEx(cqe=self.num_msgs, wc_flags=wc_flags)
try:
cq = CQEX(self.ctx, cia)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Create Extended CQ is not supported')
raise ex
return cq
def create_qp_init_attr(self):
return QPInitAttr(qp_type=self.qp_type, scq=self.scq,
rcq=self.rcq, srq=self.srq, cap=self.create_qp_cap())
return TimeStampRes
class TimeStampTest(RDMATestCase):
"""
Test various types of timestamping formats.
"""
def setUp(self):
super().setUp()
self.send_ts = None
self.recv_ts = None
self.qp_type = None
@property
def resource_arg(self):
return {'send_ts': self.send_ts, 'recv_ts': self.recv_ts,
'qp_type': self.qp_type}
def test_timestamp_free_running_rc_traffic(self):
"""
Test free running timestamp on RC traffic.
"""
self.qp_type = ibv_qp_type.IBV_QPT_RC
self.send_ts = self.recv_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP
self.create_players(timestamp_res_cls(RCResources), **self.resource_arg)
self.ts_traffic()
timestamp = convert_ts_to_ns(self.client.ctx, self.client.timestamp)
self.verify_ts(timestamp)
def test_timestamp_real_time_rc_traffic(self):
"""
Test real time timestamp on RC traffic.
"""
self.qp_type = ibv_qp_type.IBV_QPT_RC
self.send_ts = self.recv_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK
self.create_players(timestamp_res_cls(RCResources), **self.resource_arg)
self.ts_traffic()
self.verify_ts(self.client.timestamp)
def test_timestamp_free_running_send_raw_traffic(self):
"""
Test timestamping on RAW traffic only on the send completions.
"""
self.qp_type = ibv_qp_type.IBV_QPT_RAW_PACKET
self.send_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP
self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg)
self.flow = self.server.create_flow([self.server.create_eth_spec()])
self.ts_traffic()
timestamp = convert_ts_to_ns(self.client.ctx, self.client.timestamp)
self.verify_ts(timestamp)
def test_timestamp_free_running_recv_raw_traffic(self):
"""
Test timestamping on RAW traffic only on the recv completions.
"""
self.qp_type = ibv_qp_type.IBV_QPT_RAW_PACKET
self.recv_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP
self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg)
self.flow = self.server.create_flow([self.server.create_eth_spec()])
self.ts_traffic()
timestamp = convert_ts_to_ns(self.server.ctx, self.server.timestamp)
self.verify_ts(timestamp)
def test_timestamp_real_time_raw_traffic(self):
"""
Test real time timestamp on RAW traffic.
"""
self.qp_type = ibv_qp_type.IBV_QPT_RAW_PACKET
self.send_ts = self.recv_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK
self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg)
self.flow = self.server.create_flow([self.server.create_eth_spec()])
self.ts_traffic()
self.verify_ts(self.client.timestamp)
@staticmethod
def verify_ts(timestamp):
"""
Verify that the timestamp is a valid value of time.
"""
datetime.datetime.fromtimestamp(timestamp/GIGA)
@staticmethod
def poll_cq_ex_ts(cqex, ts_type=None):
"""
Poll completion from the extended CQ.
:param cqex: CQEX to poll from
:param ts_type: If set, read the CQE timestamp in this format
:return: The CQE timestamp if it requested.
"""
polling_timeout = 10
start = datetime.datetime.now()
ts = 0
poll_attr = PollCqAttr()
ret = cqex.start_poll(poll_attr)
while ret == 2 and (datetime.datetime.now() - start).seconds < polling_timeout:
ret = cqex.start_poll(poll_attr)
if ret == 2:
raise PyverbsRDMAError('Failed to poll CQEX - Got timeout')
if ret != 0:
raise PyverbsRDMAError('Failed to poll CQEX')
if cqex.status != ibv_wc_status.IBV_WC_SUCCESS:
raise PyverbsRDMAError('Completion status is {cqex.status}')
if ts_type == ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP:
ts = cqex.read_timestamp()
if ts_type == ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK:
ts = cqex.read_completion_wallclock_ns()
cqex.end_poll()
return ts
def ts_traffic(self):
"""
Run RDMA traffic and read the completions timestamps.
"""
s_recv_wr = u.get_recv_wr(self.server)
u.post_recv(self.server, s_recv_wr)
if self.qp_type == ibv_qp_type.IBV_QPT_RAW_PACKET:
c_send_wr, _, _ = u.get_send_elements_raw_qp(self.client)
else:
c_send_wr, _ = u.get_send_elements(self.client, False)
u.send(self.client, c_send_wr, ibv_wr_opcode.IBV_WR_SEND, False, 0)
self.client.timestamp = self.poll_cq_ex_ts(self.client.scq, ts_type=self.send_ts)
self.server.timestamp = self.poll_cq_ex_ts(self.server.rcq, ts_type=self.recv_ts)
class TimeAPITest(PyverbsAPITestCase):
def test_query_rt_values(self):
"""
Test the ibv_query_rt_values_ex API.
Query the device real-time values, convert them to ns and verify that
the timestamp is a valid value of time..
"""
try:
_, hw_time = self.ctx.query_rt_values_ex()
time_in_ns = convert_ts_to_ns(self.ctx, hw_time)
datetime.datetime.fromtimestamp(time_in_ns/GIGA)
except PyverbsRDMAError as ex:
if ex.error_code == errno.EOPNOTSUPP:
raise unittest.SkipTest('Query device real time is not supported')
raise ex
|