File: rdmacm_utils.py

package info (click to toggle)
rdma-core 61.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 13,124 kB
  • sloc: ansic: 176,798; python: 15,496; sh: 2,742; perl: 1,465; makefile: 73
file content (481 lines) | stat: -rw-r--r-- 21,345 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# SPDX-License-Identifier: (GPL-2.0 OR Linux-OpenIB)
# Copyright (c) 2019 Mellanox Technologies, Inc. All rights reserved.  See COPYING file
"""
Provide some useful helper function for pyverbs rdmacm' tests.
"""
import sys
from tests.utils import validate, poll_cq, get_send_elements, get_recv_wr
from tests.base_rdmacm import AsyncCMResources, SyncCMResources
from pyverbs.cmid import CMEvent, AddrInfo, JoinMCAttrEx
from pyverbs.pyverbs_error import PyverbsError, PyverbsRDMAError
from pyverbs.librdmacm_enums import rdma_port_space, rdma_cm_event_type, rdma_cm_mc_join_flags, \
    rdma_cm_join_mc_attr_mask, RDMA_OPTION_ID, RDMA_OPTION_ID_ACK_TIMEOUT
from pyverbs.addr import AH
from pyverbs.libibverbs_enums import ibv_send_flags, ibv_qp_type, ibv_qp_attr_mask
import abc
import errno

GRH_SIZE = 40
MULTICAST_QPN = 0xffffff
REJECT_MSG = 'connection rejected'


class CMConnection(abc.ABC):
    """
    RDMA CM base abstract connection class. The class contains the rdmacm
    resources and other methods to easily establish a connection and run
    traffic using the rdmacm resources. Each type of connection or traffic
    should inherit from this class and implement the necessary methods such as
    connection establishment and traffic.
    """
    def __init__(self, syncer=None, notifier=None):
        """
        Initializes a connection object.
        :param syncer: Barrier object to sync between all the test processes.
        :param notifier: Queue object to pass objects between the connection
                         sides.
        """
        self.syncer = syncer
        self.notifier = notifier
        self.cm_res = None

    def rdmacm_traffic(self, server=None, multicast=False):
        """
        Run rdmacm traffic. This method runs the compatible traffic flow
        depending on the CMResources. If self.with_ext_qp is set the traffic
        will go through the external QP.
        :param server: Run as server.
        :param multicast: Run multicast traffic.
        """
        server = server if server is not None else self.cm_res.passive
        if self.cm_res.with_ext_qp:
            if server:
                self._ext_qp_server_traffic()
            else:
                self._ext_qp_client_traffic()
        elif multicast:
            if server:
                self._cmid_server_multicast_traffic()
            else:
                self._cmid_client_multicast_traffic()
        else:
            if server:
                self._cmid_server_traffic()
            else:
                self._cmid_client_traffic()

    def remote_traffic(self, passive, remote_op='write'):
        """
        Run rdmacm remote traffic. This method runs RDMA remote traffic from
        the active to the passive.
        :param passive: If True, run as server.
        :param remote_op: 'write'/'read', The type of the RDMA remote operation.
        """
        msg_size = self.cm_res.msg_size
        if passive:
            self.cm_res.mr.write((msg_size) * 's', msg_size)
            mr_details = (self.cm_res.mr.rkey, self.cm_res.mr.buf)
            self.notifier.put(mr_details)
            self.syncer.wait()
            self.syncer.wait()
            if remote_op == 'write':
                msg_received = self.cm_res.mr.read(msg_size, 0)
                validate(msg_received, True, msg_size)
        else:
            self.cm_res.mr.write((msg_size) * 'c', msg_size)
            self.syncer.wait()
            rkey, remote_addr = self.notifier.get()
            cmid = self.cm_res.cmid
            post_func = cmid.post_write if remote_op == 'write' else \
                cmid.post_read
            for _ in range(self.cm_res.num_msgs):
                post_func(self.cm_res.mr, msg_size, remote_addr, rkey,
                          flags=ibv_send_flags.IBV_SEND_SIGNALED)
                cmid.get_send_comp()
            self.syncer.wait()
            if remote_op == 'read':
                msg_received = self.cm_res.mr.read(msg_size, 0)
                validate(msg_received, False, msg_size)

    def _ext_qp_server_traffic(self):
        """
        RDMACM server side traffic function which sends and receives a message,
        and then validates the received message. This traffic method uses the CM
        external QP and CQ for send, recv and get_completion.
        :return: None
        """
        recv_wr = get_recv_wr(self.cm_res)
        self.cm_res.qp.post_recv(recv_wr)
        self.syncer.wait()
        for _ in range(self.cm_res.num_msgs):
            poll_cq(self.cm_res.cq)
            self.cm_res.qp.post_recv(recv_wr)
            msg_received = self.cm_res.mr.read(self.cm_res.msg_size, 0)
            validate(msg_received, self.cm_res.passive, self.cm_res.msg_size)
            send_wr = get_send_elements(self.cm_res, self.cm_res.passive)[0]
            self.cm_res.qp.post_send(send_wr)
            poll_cq(self.cm_res.cq)

    def _ext_qp_client_traffic(self):
        """
        RDMACM client side traffic function which sends and receives a message,
        and then validates the received message. This traffic method uses the CM
        external QP and CQ for send, recv and get_completion.
        :return: None
        """
        recv_wr = get_recv_wr(self.cm_res)
        self.syncer.wait()
        for _ in range(self.cm_res.num_msgs):
            send_wr = get_send_elements(self.cm_res, self.cm_res.passive)[0]
            self.cm_res.qp.post_send(send_wr)
            poll_cq(self.cm_res.cq)
            self.cm_res.qp.post_recv(recv_wr)
            poll_cq(self.cm_res.cq)
            msg_received = self.cm_res.mr.read(self.cm_res.msg_size, 0)
            validate(msg_received, self.cm_res.passive, self.cm_res.msg_size)

    def _cmid_server_traffic(self):
        """
        RDMACM server side traffic function which sends and receives a message,
        and then validates the received message. This traffic method uses the
        RDMACM API for send, recv and get_completion.
        :return: None
        """
        grh_offset = GRH_SIZE if self.cm_res.qp_type == ibv_qp_type.IBV_QPT_UD else 0
        send_msg = (self.cm_res.msg_size + grh_offset) * 's'
        cmid = self.cm_res.child_id
        for _ in range(self.cm_res.num_msgs):
            cmid.post_recv(self.cm_res.mr)
            self.syncer.wait()
            self.syncer.wait()
            wc = cmid.get_recv_comp()
            msg_received = self.cm_res.mr.read(self.cm_res.msg_size, grh_offset)
            validate(msg_received, True, self.cm_res.msg_size)
            if self.cm_res.port_space == rdma_port_space.RDMA_PS_TCP:
                self.cm_res.mr.write(send_msg, self.cm_res.msg_size)
                cmid.post_send(self.cm_res.mr)
            else:
                ah = AH(cmid.pd, wc=wc, port_num=self.cm_res.ib_port, grh=self.cm_res.mr.buf)
                rqpn = self.cm_res.remote_qpn
                self.cm_res.mr.write(send_msg, self.cm_res.msg_size + GRH_SIZE)
                cmid.post_ud_send(self.cm_res.mr, ah, rqpn=rqpn,
                                  length=self.cm_res.msg_size)
            cmid.get_send_comp()
            self.syncer.wait()

    def _cmid_client_traffic(self):
        """
        RDMACM client side traffic function which sends and receives a message,
        and then validates the received message. This traffic method uses the
        RDMACM API for send, recv and get_completion.
        :return: None
        """
        grh_offset = GRH_SIZE if self.cm_res.qp_type == ibv_qp_type.IBV_QPT_UD else 0
        send_msg = (self.cm_res.msg_size + grh_offset) * 'c'
        cmid = self.cm_res.cmid
        for _ in range(self.cm_res.num_msgs):
            self.cm_res.mr.write(send_msg, self.cm_res.msg_size + grh_offset)
            self.syncer.wait()
            if self.cm_res.port_space == rdma_port_space.RDMA_PS_TCP:
                cmid.post_send(self.cm_res.mr)
            else:
                ah = AH(cmid.pd, attr=self.cm_res.ud_params.ah_attr)
                cmid.post_ud_send(self.cm_res.mr, ah, rqpn=self.cm_res.ud_params.qp_num,
                                  length=self.cm_res.msg_size)
            cmid.get_send_comp()
            cmid.post_recv(self.cm_res.mr)
            self.syncer.wait()
            self.syncer.wait()
            cmid.get_recv_comp()
            msg_received = self.cm_res.mr.read(self.cm_res.msg_size, grh_offset)
            validate(msg_received, False, self.cm_res.msg_size)

    def _cmid_server_multicast_traffic(self):
        """
        RDMACM server side multicast traffic function which receives a message,
        and then validates its data.
        """
        for _ in range(self.cm_res.num_msgs):
            self.cm_res.cmid.post_recv(self.cm_res.mr)
            self.syncer.wait()
            self.syncer.wait()
            self.cm_res.cmid.get_recv_comp()
            msg_received = self.cm_res.mr.read(self.cm_res.msg_size, GRH_SIZE)
            validate(msg_received, True, self.cm_res.msg_size)

    def _cmid_client_multicast_traffic(self):
        """
        RDMACM client side multicast traffic function which sends a message to
        the multicast group.
        """
        send_msg = (self.cm_res.msg_size + GRH_SIZE) * 'c'
        for _ in range(self.cm_res.num_msgs):
            self.cm_res.mr.write(send_msg, self.cm_res.msg_size + GRH_SIZE)
            self.syncer.wait()
            ah = AH(self.cm_res.cmid.pd, attr=self.cm_res.ud_params.ah_attr)
            self.cm_res.cmid.post_ud_send(self.cm_res.mr, ah, rqpn=MULTICAST_QPN,
                                          length=self.cm_res.msg_size)
            self.cm_res.cmid.get_send_comp()
            self.syncer.wait()

    def event_handler(self, expected_event=None):
        """
        Handle and execute corresponding API for RDMACM events of asynchronous
        communication.
        :param expected_event: The user expected event.
        :return: None
        """
        cm_event = CMEvent(self.cm_res.cmid.event_channel)
        if cm_event.event_type == rdma_cm_event_type.RDMA_CM_EVENT_CONNECT_REQUEST:
            self.cm_res.create_child_id(cm_event)
        elif cm_event.event_type in [rdma_cm_event_type.RDMA_CM_EVENT_ESTABLISHED,
                                     rdma_cm_event_type.RDMA_CM_EVENT_MULTICAST_JOIN]:
            self.cm_res.set_ud_params(cm_event)
        if expected_event and expected_event != cm_event.event_type:
            raise PyverbsError('Expected this event: {}, got this event: {}'.
                                format(expected_event, cm_event.event_str()))
        if expected_event == rdma_cm_event_type.RDMA_CM_EVENT_REJECTED:
            assert cm_event.private_data[:len(REJECT_MSG)].decode() == REJECT_MSG, \
                f'CM event data ({cm_event.private_data}) is different than the expected ({REJECT_MSG})'
        cm_event.ack_cm_event()

    @abc.abstractmethod
    def establish_connection(self):
        pass

    @abc.abstractmethod
    def disconnect(self):
        pass


class CMAsyncConnection(CMConnection):
    """
    Implement RDMACM connection management for asynchronous CMIDs. It includes
    connection establishment, disconnection and other methods such as traffic.
    """
    def __init__(self, ip_addr, syncer=None, notifier=None, passive=False,
                 num_conns=1, qp_timeout=-1, reject_conn=False, **kwargs):
        """
        Init the CMConnection and then init the AsyncCMResources.
        :param ip_addr: IP address to use.
        :param syncer: Barrier object to sync between all the test processes.
        :param notifier: Queue object to pass objects between the connection
                         sides.
        :param passive: Indicate if it's a passive side.
        :param num_conns: Number of connections.
        :param qp_timeout: Value of the QP timeout.
        :param reject_conn: True if the server will reject the connection.
        :param kwargs: Arguments used to initialize the CM resources. For more
                       info please check CMResources.
        """
        super(CMAsyncConnection, self).__init__(syncer=syncer, notifier=notifier)
        self.num_conns = num_conns
        self.create_cm_res(ip_addr, passive=passive, **kwargs)
        self.qp_timeout = qp_timeout
        self.reject_conn = reject_conn

    def create_cm_res(self, ip_addr, passive, **kwargs):
        self.cm_res = AsyncCMResources(addr=ip_addr, passive=passive, **kwargs)
        if passive:
            self.cm_res.create_cmid()
        else:
            for i in range(self.num_conns):
                self.cm_res.create_cmid(i)

    def join_to_multicast(self, mc_addr=None, src_addr=None, extended=False):
        """
        Join the CMID to multicast group.
        :param mc_addr: The multicast IP address.
        :param src_addr: The CMIDs source address.
        :param extended: Use the join_multicast_ex API.
        """
        self.cm_res.cmid.bind_addr(self.cm_res.ai)
        resolve_addr_info = AddrInfo(src=src_addr, dst=mc_addr)
        self.cm_res.cmid.resolve_addr(resolve_addr_info)
        self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_ADDR_RESOLVED)
        self.cm_res.create_qp()
        mc_addr_info = AddrInfo(src=mc_addr)
        if not extended:
            self.cm_res.cmid.join_multicast(addr=mc_addr_info)
        else:
            flags = rdma_cm_mc_join_flags.RDMA_MC_JOIN_FLAG_FULLMEMBER
            comp_mask = rdma_cm_join_mc_attr_mask.RDMA_CM_JOIN_MC_ATTR_ADDRESS | \
                        rdma_cm_join_mc_attr_mask.RDMA_CM_JOIN_MC_ATTR_JOIN_FLAGS
            mcattr = JoinMCAttrEx(addr=mc_addr_info, comp_mask=comp_mask,
                                  join_flags=flags)
            self.cm_res.cmid.join_multicast(mc_attr=mcattr)
        self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_MULTICAST_JOIN)
        self.cm_res.create_mr()

    def leave_multicast(self, mc_addr=None):
        """
        Leave multicast group.
        :param mc_addr: The multicast IP address.
        """
        mc_addr_info = AddrInfo(src=mc_addr)
        self.cm_res.cmid.leave_multicast(mc_addr_info)

    def establish_connection(self):
        """
        Establish RDMACM connection between two Async CMIDs.
        """
        if self.cm_res.passive:
            self.cm_res.cmid.bind_addr(self.cm_res.ai)
            self.cm_res.cmid.listen()
        for conn_idx in range(self.num_conns):
            if self.cm_res.passive:
                self.syncer.wait()
                self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_CONNECT_REQUEST)
                self.cm_res.create_qp(conn_idx=conn_idx)
                if self.qp_timeout >= 0:
                    self.set_qp_timeout(self.cm_res.child_ids[conn_idx], self.qp_timeout)
                if self.cm_res.with_ext_qp:
                    self.set_cmids_qp_ece(self.cm_res.passive)
                    self.cm_res.modify_ext_qp_to_rts(conn_idx=conn_idx)
                    self.set_cmid_ece(self.cm_res.passive)
                child_id = self.cm_res.child_ids[conn_idx]
                if self.reject_conn:
                    child_id.reject(REJECT_MSG.encode())
                    return
                child_id.accept(self.cm_res.create_conn_param(conn_idx=conn_idx))
                if self.qp_timeout >= 0:
                    attr, _ = child_id.query_qp(ibv_qp_attr_mask.IBV_QP_TIMEOUT)
                    assert self.qp_timeout == attr.timeout
                if self.cm_res.port_space == rdma_port_space.RDMA_PS_TCP:
                    self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_ESTABLISHED)
            else:
                cmid = self.cm_res.cmids[conn_idx]
                cmid.resolve_addr(self.cm_res.ai)
                self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_ADDR_RESOLVED)
                self.syncer.wait()
                cmid.resolve_route()
                self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_ROUTE_RESOLVED)
                self.cm_res.create_qp(conn_idx=conn_idx)
                if self.qp_timeout >= 0:
                    self.set_qp_timeout(self.cm_res.cmid, self.qp_timeout)
                if self.cm_res.with_ext_qp:
                    self.set_cmid_ece(self.cm_res.passive)
                cmid.connect(self.cm_res.create_conn_param(conn_idx=conn_idx))
                if self.cm_res.with_ext_qp:
                    self.event_handler(expected_event=\
                        rdma_cm_event_type.RDMA_CM_EVENT_CONNECT_RESPONSE)
                    self.set_cmids_qp_ece(self.cm_res.passive)
                    self.cm_res.modify_ext_qp_to_rts(conn_idx=conn_idx)
                    cmid.establish()
                else:
                    if self.reject_conn:
                        self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_REJECTED)
                        return
                    self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_ESTABLISHED)
                if self.qp_timeout >= 0:
                    attr, _ = self.cm_res.cmid.query_qp(ibv_qp_attr_mask.IBV_QP_TIMEOUT)
                    assert self.qp_timeout == attr.timeout
        self.cm_res.create_mr()
        self.sync_qp_numbers()

    def set_qp_timeout(self, cm_id, ack_timeout):
        cm_id.set_option(RDMA_OPTION_ID, RDMA_OPTION_ID_ACK_TIMEOUT, ack_timeout, 1)

    def sync_qp_numbers(self):
        """
        Sync the QP numbers of the connections sides.
        """
        if self.cm_res.passive:
            self.syncer.wait()
            self.notifier.put(self.cm_res.my_qp_number())
            self.syncer.wait()
            self.cm_res.remote_qpn = self.notifier.get()
        else:
            self.syncer.wait()
            self.cm_res.remote_qpn = self.notifier.get()
            self.notifier.put(self.cm_res.my_qp_number())
            self.syncer.wait()

    def disconnect(self):
        """
        Disconnect the connection.
        """
        if self.cm_res.port_space == rdma_port_space.RDMA_PS_TCP:
            if self.cm_res.passive:
                for child_id in self.cm_res.child_ids.values():
                    child_id.disconnect()
            else:
                self.event_handler(expected_event=rdma_cm_event_type.RDMA_CM_EVENT_DISCONNECTED)
                for cmid in self.cm_res.cmids.values():
                    cmid.disconnect()

    def set_cmid_ece(self, passive):
        """
        Set the local CMIDs ECE. The ECE is taken from the CMIDs QP ECE.
        :param passive: Indicates if this CMID is participate as passive in
                        this connection.
        """
        cmid = self.cm_res.child_id if passive else self.cm_res.cmid
        try:
            ece = self.cm_res.qp.query_ece()
            cmid.set_local_ece(ece)
        except PyverbsRDMAError as ex:
            if ex.error_code != errno.EOPNOTSUPP:
                raise ex

    def set_cmids_qp_ece(self, passive):
        """
        Set the CMIDs QP ECE.
        :param passive: Indicates if this CMID is participate as passive in
                        this connection.
        """
        cmid = self.cm_res.child_id if passive else self.cm_res.cmid
        try:
            ece = cmid.get_remote_ece()
            self.cm_res.qp.set_ece(ece)
        except PyverbsRDMAError as ex:
            if ex.error_code != errno.EOPNOTSUPP:
                raise ex

class CMSyncConnection(CMConnection):
    """
    Implement RDMACM connection management for synchronous CMIDs. It includes
    connection establishment, disconnection and other methods such as traffic.
    """
    def __init__(self, ip_addr, syncer=None, notifier=None, passive=False, **kwargs):
        """
        Init the CMConnection and then init the SyncCMResources.
        :param ip_addr: IP address to use.
        :param syncer: Barrier object to sync between all the test processes.
        :param notifier: Queue object to pass objects between the connection
                         sides.
        :param passive: Indicate if it's a passive side.
        :param kwargs: Arguments used to initialize the CM resources. For more
                       info please check CMResources.
        """
        super(CMSyncConnection, self).__init__(syncer=syncer, notifier=notifier)
        self.create_cm_res(ip_addr, passive=passive, **kwargs)

    def create_cm_res(self, ip_addr, passive, **kwargs):
        self.cm_res = SyncCMResources(addr=ip_addr, passive=passive, **kwargs)
        self.cm_res.create_cmid()

    def establish_connection(self):
        """
        Establish RDMACM connection between two Sync CMIDs.
        """
        if self.cm_res.passive:
            self.cm_res.cmid.listen()
            self.syncer.wait()
            self.cm_res.create_child_id()
            self.cm_res.child_id.accept()
            self.cm_res.create_mr()
        else:
            self.syncer.wait()
            self.cm_res.cmid.connect()
            self.cm_res.create_mr()

    def disconnect(self):
        """
        Disconnect the connection.
        """
        if self.cm_res.port_space == rdma_port_space.RDMA_PS_TCP:
            if self.cm_res.passive:
                self.cm_res.child_id.disconnect()
            else:
                self.cm_res.cmid.disconnect()