File: test_mlx5_timestamp.py

package info (click to toggle)
rdma-core 61.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 13,124 kB
  • sloc: ansic: 176,798; python: 15,496; sh: 2,742; perl: 1,465; makefile: 73
file content (219 lines) | stat: -rwxr-xr-x 8,529 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import unittest
import datetime
import errno

from tests.base import RCResources, RDMATestCase, PyverbsAPITestCase
from pyverbs.providers.mlx5.mlx5dv import Mlx5Context
from pyverbs.pyverbs_error import PyverbsRDMAError
from pyverbs.cq import CqInitAttrEx, CQEX
from tests.test_flow import FlowRes
from pyverbs.qp import QPInitAttr
from pyverbs.cq import PollCqAttr
from pyverbs.libibverbs_enums import ibv_qp_type, ibv_wc_status, ibv_wr_opcode, ibv_create_cq_wc_flags, \
    IBV_WC_STANDARD_FLAGS
import tests.utils as u


GIGA = 1000000000


def convert_ts_to_ns(ctx, device_ts):
    """
    Convert device timestamp from HCA core clock units to corresponding
    nanosecond counts.
    :param ctx: The context that gets this timestamp.
    :param device_ts: The device timestamp to translate.
    :return: Timestamp in nanoseconds
    """
    try:
        timestamp_in_ns = Mlx5Context.device_timestamp_to_ns(ctx, device_ts)
    except PyverbsRDMAError as ex:
        if ex.error_code == errno.EOPNOTSUPP:
            raise unittest.SkipTest('Converting timestamp to nanoseconds is not supported')
        raise ex
    return timestamp_in_ns


def timestamp_res_cls(base_class):
    """
    This is a factory function which creates a class that inherits base_class of
    any BaseResources type.
    :param base_class: The base resources class to inherit from.
    :return: TimeStampRes class.
    """
    class TimeStampRes(base_class):
        def __init__(self, dev_name, ib_port, gid_index, qp_type, send_ts=None,
                     recv_ts=None):
            self.qp_type = qp_type
            self.send_ts = send_ts
            self.recv_ts = recv_ts
            self.timestamp = None
            self.scq = None
            self.rcq = None
            super().__init__(dev_name=dev_name, ib_port=ib_port, gid_index=gid_index)

        def create_cq(self):
            self.scq = self._create_ex_cq(self.send_ts)
            self.rcq = self._create_ex_cq(self.recv_ts)

        def _create_ex_cq(self, timestamp=None):
            """
            Create an Extended CQ.
            :param timestamp: If set, the timestamp type to use.
            """
            wc_flags = IBV_WC_STANDARD_FLAGS
            if timestamp:
                wc_flags |= timestamp
            cia = CqInitAttrEx(cqe=self.num_msgs, wc_flags=wc_flags)
            try:
                cq = CQEX(self.ctx, cia)
            except PyverbsRDMAError as ex:
                if ex.error_code == errno.EOPNOTSUPP:
                    raise unittest.SkipTest('Create Extended CQ is not supported')
                raise ex
            return cq

        def create_qp_init_attr(self):
            return QPInitAttr(qp_type=self.qp_type, scq=self.scq,
                              rcq=self.rcq, srq=self.srq, cap=self.create_qp_cap())

    return TimeStampRes


class TimeStampTest(RDMATestCase):
    """
    Test various types of timestamping formats.
    """
    def setUp(self):
        super().setUp()
        self.send_ts = None
        self.recv_ts = None
        self.qp_type = None

    @property
    def resource_arg(self):
        return {'send_ts': self.send_ts, 'recv_ts': self.recv_ts,
                'qp_type': self.qp_type}

    def test_timestamp_free_running_rc_traffic(self):
        """
        Test free running timestamp on RC traffic.
        """
        self.qp_type = ibv_qp_type.IBV_QPT_RC
        self.send_ts = self.recv_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP
        self.create_players(timestamp_res_cls(RCResources), **self.resource_arg)
        self.ts_traffic()
        timestamp = convert_ts_to_ns(self.client.ctx, self.client.timestamp)
        self.verify_ts(timestamp)

    def test_timestamp_real_time_rc_traffic(self):
        """
        Test real time timestamp on RC traffic.
        """
        self.qp_type = ibv_qp_type.IBV_QPT_RC
        self.send_ts = self.recv_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK
        self.create_players(timestamp_res_cls(RCResources), **self.resource_arg)
        self.ts_traffic()
        self.verify_ts(self.client.timestamp)

    def test_timestamp_free_running_send_raw_traffic(self):
        """
        Test timestamping on RAW traffic only on the send completions.
        """
        self.qp_type = ibv_qp_type.IBV_QPT_RAW_PACKET
        self.send_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP
        self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg)
        self.flow = self.server.create_flow([self.server.create_eth_spec()])
        self.ts_traffic()
        timestamp = convert_ts_to_ns(self.client.ctx, self.client.timestamp)
        self.verify_ts(timestamp)

    def test_timestamp_free_running_recv_raw_traffic(self):
        """
        Test timestamping on RAW traffic only on the recv completions.
        """
        self.qp_type = ibv_qp_type.IBV_QPT_RAW_PACKET
        self.recv_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP
        self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg)
        self.flow = self.server.create_flow([self.server.create_eth_spec()])
        self.ts_traffic()
        timestamp = convert_ts_to_ns(self.server.ctx, self.server.timestamp)
        self.verify_ts(timestamp)

    def test_timestamp_real_time_raw_traffic(self):
        """
        Test real time timestamp on RAW traffic.
        """
        self.qp_type = ibv_qp_type.IBV_QPT_RAW_PACKET
        self.send_ts = self.recv_ts = ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK
        self.create_players(timestamp_res_cls(FlowRes), **self.resource_arg)
        self.flow = self.server.create_flow([self.server.create_eth_spec()])
        self.ts_traffic()
        self.verify_ts(self.client.timestamp)

    @staticmethod
    def verify_ts(timestamp):
        """
        Verify that the timestamp is a valid value of time.
        """
        datetime.datetime.fromtimestamp(timestamp/GIGA)

    @staticmethod
    def poll_cq_ex_ts(cqex, ts_type=None):
        """
        Poll completion from the extended CQ.
        :param cqex: CQEX to poll from
        :param ts_type: If set, read the CQE timestamp in this format
        :return: The CQE timestamp if it requested.
        """
        polling_timeout = 10
        start = datetime.datetime.now()
        ts = 0

        poll_attr = PollCqAttr()
        ret = cqex.start_poll(poll_attr)
        while ret == 2 and (datetime.datetime.now() - start).seconds < polling_timeout:
            ret = cqex.start_poll(poll_attr)
        if ret == 2:
            raise PyverbsRDMAError('Failed to poll CQEX - Got timeout')
        if ret != 0:
            raise PyverbsRDMAError('Failed to poll CQEX')
        if cqex.status != ibv_wc_status.IBV_WC_SUCCESS:
            raise PyverbsRDMAError('Completion status is {cqex.status}')
        if ts_type == ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP:
            ts = cqex.read_timestamp()
        if ts_type == ibv_create_cq_wc_flags.IBV_WC_EX_WITH_COMPLETION_TIMESTAMP_WALLCLOCK:
            ts = cqex.read_completion_wallclock_ns()
        cqex.end_poll()
        return ts

    def ts_traffic(self):
        """
        Run RDMA traffic and read the completions timestamps.
        """
        s_recv_wr = u.get_recv_wr(self.server)
        u.post_recv(self.server, s_recv_wr)
        if self.qp_type == ibv_qp_type.IBV_QPT_RAW_PACKET:
            c_send_wr, _, _ = u.get_send_elements_raw_qp(self.client)
        else:
            c_send_wr, _ = u.get_send_elements(self.client, False)
        u.send(self.client, c_send_wr, ibv_wr_opcode.IBV_WR_SEND, False, 0)
        self.client.timestamp = self.poll_cq_ex_ts(self.client.scq, ts_type=self.send_ts)
        self.server.timestamp = self.poll_cq_ex_ts(self.server.rcq, ts_type=self.recv_ts)


class TimeAPITest(PyverbsAPITestCase):
    def test_query_rt_values(self):
        """
        Test the ibv_query_rt_values_ex API.
        Query the device real-time values, convert them to ns and verify that
        the timestamp is a valid value of time..
        """
        try:
            _, hw_time = self.ctx.query_rt_values_ex()
            time_in_ns = convert_ts_to_ns(self.ctx, hw_time)
            datetime.datetime.fromtimestamp(time_in_ns/GIGA)
        except PyverbsRDMAError as ex:
            if ex.error_code == errno.EOPNOTSUPP:
                raise unittest.SkipTest('Query device real time is not supported')
            raise ex