File: pml_ucx_request.h

package info (click to toggle)
openmpi 2.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 99,912 kB
  • ctags: 55,589
  • sloc: ansic: 525,999; f90: 18,307; makefile: 12,062; sh: 6,583; java: 6,278; asm: 3,515; cpp: 2,227; perl: 2,136; python: 1,350; lex: 734; fortran: 52; tcl: 12
file content (192 lines) | stat: -rw-r--r-- 6,150 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
/*
 * Copyright (C) Mellanox Technologies Ltd. 2001-2015.  ALL RIGHTS RESERVED.
 * Copyright (c) 2016      The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * $COPYRIGHT$
 *
 * Additional copyrights may follow
 *
 * $HEADER$
 */

#ifndef PML_UCX_REQUEST_H_
#define PML_UCX_REQUEST_H_

#include "pml_ucx.h"
#include "pml_ucx_datatype.h"


enum {
    MCA_PML_UCX_REQUEST_FLAG_SEND         = (1 << 0), /* Persistent send */
    MCA_PML_UCX_REQUEST_FLAG_FREE_CALLED  = (1 << 1),
    MCA_PML_UCX_REQUEST_FLAG_COMPLETED    = (1 << 2)
};

/*
 * UCX tag structure:
 *
 * 01234567 01234567 01234567 01234567 01234567 01234567 01234567 01234567
 *                           |                          |
 *      message tag (24)     |     source rank (24)     |  context id (16)
 *                           |                          |
 */
#define PML_UCX_TAG_BITS                       24
#define PML_UCX_RANK_BITS                      24
#define PML_UCX_CONTEXT_BITS                   16


#define PML_UCX_MAKE_SEND_TAG(_tag, _comm) \
    ((((uint64_t) (_tag)            ) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS)) | \
     (((uint64_t)(_comm)->c_my_rank ) << PML_UCX_CONTEXT_BITS) | \
     ((uint64_t)(_comm)->c_contextid))


#define PML_UCX_MAKE_RECV_TAG(_ucp_tag, _ucp_tag_mask, _tag, _src, _comm) \
    { \
        if ((_src) == MPI_ANY_SOURCE) { \
            _ucp_tag_mask = 0x800000000000fffful; \
        } else { \
            _ucp_tag_mask = 0x800000fffffffffful; \
        } \
        \
        _ucp_tag = (((uint64_t)(_src) & UCS_MASK(PML_UCX_RANK_BITS)) << PML_UCX_CONTEXT_BITS) | \
                   (_comm)->c_contextid; \
        \
        if ((_tag) != MPI_ANY_TAG) { \
            _ucp_tag_mask |= 0x7fffff0000000000ul; \
            _ucp_tag      |= ((uint64_t)(_tag)) << (PML_UCX_RANK_BITS + PML_UCX_CONTEXT_BITS); \
        } \
    }

#define PML_UCX_TAG_GET_SOURCE(_tag) \
    (((_tag) >> PML_UCX_CONTEXT_BITS) & UCS_MASK(PML_UCX_RANK_BITS))


#define PML_UCX_TAG_GET_MPI_TAG(_tag) \
    ((_tag) >> (PML_UCX_CONTEXT_BITS + PML_UCX_RANK_BITS))


#define PML_UCX_MESSAGE_NEW(_comm, _ucp_msg, _info, _message) \
    { \
        struct ompi_message_t *msg = ompi_message_alloc(); \
        if (msg == NULL) { \
            /* TODO release UCP message */ \
            return OMPI_ERR_OUT_OF_RESOURCE; \
        } \
        \
        msg->comm    = (_comm); \
        msg->req_ptr = (_ucp_msg); \
        msg->peer    = PML_UCX_TAG_GET_SOURCE((_info)->sender_tag); \
        msg->count   = (_info)->length; \
        *(_message)  = msg; \
    }


#define PML_UCX_MESSAGE_RELEASE(_message) \
    { \
        ompi_message_return(*(_message)); \
        *(_message) = NULL; \
    }


struct pml_ucx_persistent_request {
    ompi_request_t                    ompi;
    ompi_request_t                    *tmp_req;
    unsigned                          flags;
    void                              *buffer;
    size_t                            count;
    ucp_datatype_t                    datatype;
    ucp_tag_t                         tag;
    struct {
        mca_pml_base_send_mode_t      mode;
        ucp_ep_h                      ep;
    } send;
    struct {
        ucp_tag_t                     tag_mask;
    } recv;
};


void mca_pml_ucx_send_completion(void *request, ucs_status_t status);

void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
                                 ucp_tag_recv_info_t *info);

void mca_pml_ucx_psend_completion(void *request, ucs_status_t status);

void mca_pml_ucx_precv_completion(void *request, ucs_status_t status,
                                  ucp_tag_recv_info_t *info);

void mca_pml_ucx_persistent_request_complete(mca_pml_ucx_persistent_request_t *preq,
                                             ompi_request_t *tmp_req);

void mca_pml_ucx_completed_request_init(ompi_request_t *ompi_req);

void mca_pml_ucx_request_init(void *request);

void mca_pml_ucx_request_cleanup(void *request);


static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int dst)
{
    ucp_ep_h ep = ompi_comm_peer_lookup(comm,dst)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
    if (OPAL_UNLIKELY(NULL == ep)) {
        ep = mca_pml_ucx_add_proc(comm, dst);
    }

    return ep;
}

static inline void mca_pml_ucx_request_reset(ompi_request_t *req)
{
    req->req_complete          = REQUEST_PENDING;
    req->req_status._cancelled = false;
}

static void mca_pml_ucx_set_send_status(ompi_status_public_t* mpi_status,
                                        ucs_status_t status)
{
    if (status == UCS_OK) {
        mpi_status->MPI_ERROR  = MPI_SUCCESS;
    } else if (status == UCS_ERR_CANCELED) {
        mpi_status->_cancelled = true;
    } else {
        mpi_status->MPI_ERROR  = MPI_ERR_INTERN;
    }
}

static inline void mca_pml_ucx_set_recv_status(ompi_status_public_t* mpi_status,
                                               ucs_status_t ucp_status,
                                               const ucp_tag_recv_info_t *info)
{
    int64_t tag;

    if (ucp_status == UCS_OK) {
        tag = info->sender_tag;
        mpi_status->MPI_ERROR  = MPI_SUCCESS;
        mpi_status->MPI_SOURCE = PML_UCX_TAG_GET_SOURCE(tag);
        mpi_status->MPI_TAG    = PML_UCX_TAG_GET_MPI_TAG(tag);
        mpi_status->_ucount    = info->length;
    } else if (ucp_status == UCS_ERR_MESSAGE_TRUNCATED) {
        mpi_status->MPI_ERROR = MPI_ERR_TRUNCATE;
    } else if (ucp_status == UCS_ERR_CANCELED) {
        mpi_status->_cancelled = true;
    } else {
        mpi_status->MPI_ERROR = MPI_ERR_INTERN;
    }
}

static inline void mca_pml_ucx_set_recv_status_safe(ompi_status_public_t* mpi_status,
                                                    ucs_status_t ucp_status,
                                                    const ucp_tag_recv_info_t *info)
{
    if (mpi_status != MPI_STATUS_IGNORE) {
        mca_pml_ucx_set_recv_status(mpi_status, ucp_status, info);
    }
}

OBJ_CLASS_DECLARATION(mca_pml_ucx_persistent_request_t);


#endif /* PML_UCX_REQUEST_H_ */