File: spml_ucx.h

package info (click to toggle)
openmpi 5.0.8-3
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 201,692 kB
  • sloc: ansic: 613,078; makefile: 42,353; sh: 11,194; javascript: 9,244; f90: 7,052; java: 6,404; perl: 5,179; python: 1,859; lex: 740; fortran: 61; cpp: 20; tcl: 12
file content (409 lines) | stat: -rw-r--r-- 16,124 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
/*
 * Copyright (c) 2013      Mellanox Technologies, Inc.
 *                         All rights reserved.
 * Copyright (c) 2016-2019 Research Organization for Information Science
 *                         and Technology (RIST).  All rights reserved.
 * Copyright (c) 2016      ARM, Inc. All rights reserved.
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */
/**
 *  @file 
 */

#ifndef MCA_SPML_UCX_H
#define MCA_SPML_UCX_H

#include "oshmem_config.h"
#include "oshmem/request/request.h"
#include "oshmem/mca/spml/base/base.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/util/oshmem_util.h"
#include "oshmem/mca/spml/base/spml_base_putreq.h"
#include "oshmem/proc/proc.h"
#include "oshmem/mca/spml/base/spml_base_request.h"
#include "oshmem/mca/spml/base/spml_base_getreq.h"
#include "oshmem/runtime/runtime.h"

#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"

#include "opal/class/opal_free_list.h"
#include "opal/class/opal_list.h"
#include "opal/class/opal_bitmap.h"

#include "opal/mca/common/ucx/common_ucx.h"

#include <ucp/api/ucp.h>

BEGIN_C_DECLS

#define SPML_UCX_ASSERT  MCA_COMMON_UCX_ASSERT
#define SPML_UCX_ERROR   MCA_COMMON_UCX_ERROR
#define SPML_UCX_WARN    MCA_COMMON_UCX_WARN
#define SPML_UCX_VERBOSE MCA_COMMON_UCX_VERBOSE
#define SPML_UCX_TRANSP_IDX 0
#define SPML_UCX_TRANSP_CNT 1
#define SPML_UCX_SERVICE_SEG 0

enum {
    SPML_UCX_STRONG_ORDERING_NONE  = 0, /* don't use strong ordering */
    SPML_UCX_STRONG_ORDERING_GETNB = 1, /* use non-blocking read to provide ordering */
    SPML_UCX_STRONG_ORDERING_GET   = 2, /* use blocking read to provide ordering*/
    SPML_UCX_STRONG_ORDERING_FLUSH = 3  /* flush EP to provide ordering */
};

/**
 * UCX SPML module
 */
struct spml_ucx_mkey {
    ucp_rkey_h rkey;
    ucp_mem_h  mem_h;
}; 
typedef struct spml_ucx_mkey spml_ucx_mkey_t;

struct spml_ucx_cached_mkey {
    mkey_segment_t   super;
    spml_ucx_mkey_t  key;
};
typedef struct spml_ucx_cached_mkey spml_ucx_cached_mkey_t;

struct ucp_peer {
    ucp_ep_h                 ucp_conn;
    spml_ucx_cached_mkey_t **mkeys;
    size_t                   mkeys_cnt;
};
typedef struct ucp_peer ucp_peer_t;

/* An rkey_store entry */
typedef struct mca_spml_ucx_rkey {
    ucp_rkey_h rkey;
    int        refcnt;
} mca_spml_ucx_rkey_t;

typedef struct mca_spml_ucx_rkey_store {
    mca_spml_ucx_rkey_t *array;
    int                  size;
    int                  count;
} mca_spml_ucx_rkey_store_t;

struct mca_spml_ucx_ctx {
    ucp_worker_h             *ucp_worker;
    ucp_peer_t               *ucp_peers;
    long                      options;
    opal_bitmap_t             put_op_bitmap;
    unsigned long             nb_progress_cnt;
    unsigned int              ucp_workers;
    int                      *put_proc_indexes;
    unsigned                  put_proc_count;
    bool                      synchronized_quiet;
    int                       strong_sync;
    mca_spml_ucx_rkey_store_t rkey_store;
};
typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;

extern mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default;

typedef spml_ucx_mkey_t * (*mca_spml_ucx_get_mkey_slow_fn_t)(shmem_ctx_t ctx, int pe, void *va, void **rva);

typedef struct mca_spml_ucx_ctx_array {
    int                      ctxs_count;
    int                      ctxs_num;
    mca_spml_ucx_ctx_t       **ctxs;
} mca_spml_ucx_ctx_array_t;

struct mca_spml_ucx {
    mca_spml_base_module_t   super;
    ucp_context_h            ucp_context;
    int                      num_disconnect;
    int                      heap_reg_nb;
    bool                     enabled;
    mca_spml_ucx_get_mkey_slow_fn_t get_mkey_slow;
    char                     ***remote_addrs_tbl;
    mca_spml_ucx_ctx_array_t active_array;
    mca_spml_ucx_ctx_array_t idle_array;
    int                      priority; /* component priority */
    shmem_internal_mutex_t   internal_mutex;
    pthread_mutex_t          ctx_create_mutex;
    /* Fields controlling aux context for put_all_nb SPML routine */
    bool                     async_progress;
    int                      async_tick;
    opal_event_base_t        *async_event_base;
    opal_event_t             *tick_event;
    mca_spml_ucx_ctx_t       *aux_ctx;
    pthread_spinlock_t       async_lock;
    int                      aux_refcnt;
    unsigned long            nb_progress_thresh_global;
    unsigned long            nb_put_progress_thresh;
    unsigned long            nb_get_progress_thresh;
    unsigned long            nb_ucp_worker_progress;
    unsigned int             ucp_workers;
    unsigned int             ucp_worker_cnt;
    int                      symmetric_rkey_max_count;
};
typedef struct mca_spml_ucx mca_spml_ucx_t;

extern mca_spml_ucx_t mca_spml_ucx;

extern int mca_spml_ucx_enable(bool enable);
extern int mca_spml_ucx_ctx_create(long options,
                                   shmem_ctx_t *ctx);
extern void mca_spml_ucx_ctx_destroy(shmem_ctx_t ctx);
extern int mca_spml_ucx_get(shmem_ctx_t ctx,
                              void* dst_addr,
                              size_t size,
                              void* src_addr,
                              int src);

extern int mca_spml_ucx_get_nb(shmem_ctx_t ctx,
                              void* dst_addr,
                              size_t size,
                              void* src_addr,
                              int src,
                              void **handle);

extern int mca_spml_ucx_get_nb_wprogress(shmem_ctx_t ctx,
                              void* dst_addr,
                              size_t size,
                              void* src_addr,
                              int src,
                              void **handle);

extern int mca_spml_ucx_put(shmem_ctx_t ctx,
                            void* dst_addr,
                            size_t size,
                            void* src_addr,
                            int dst);

extern int mca_spml_ucx_put_nb(shmem_ctx_t ctx,
                               void* dst_addr,
                               size_t size,
                               void* src_addr,
                               int dst,
                               void **handle);

extern int mca_spml_ucx_put_nb_wprogress(shmem_ctx_t ctx,
                               void* dst_addr,
                               size_t size,
                               void* src_addr,
                               int dst,
                               void **handle);

extern int mca_spml_ucx_recv(void* buf, size_t size, int src);
extern int mca_spml_ucx_send(void* buf,
                             size_t size,
                             int dst,
                             mca_spml_base_put_mode_t mode);

extern int mca_spml_ucx_put_all_nb(void *target,
                                   const void *source,
                                   size_t size,
                                   long *counter);

extern sshmem_mkey_t *mca_spml_ucx_register(void* addr,
                                                size_t size,
                                                uint64_t shmid,
                                                int *count);
extern int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys);

extern void mca_spml_ucx_memuse_hook(void *addr, size_t length);

extern void mca_spml_ucx_rmkey_unpack(shmem_ctx_t ctx, sshmem_mkey_t *mkey, uint32_t segno, int pe, int tr_id);
extern void mca_spml_ucx_rmkey_free(sshmem_mkey_t *mkey, int pe);
extern void *mca_spml_ucx_rmkey_ptr(const void *dst_addr, sshmem_mkey_t *, int pe);

extern int mca_spml_ucx_add_procs(oshmem_group_t* group, size_t nprocs);
extern int mca_spml_ucx_del_procs(oshmem_group_t* group, size_t nprocs);
extern int mca_spml_ucx_fence(shmem_ctx_t ctx);
extern int mca_spml_ucx_quiet(shmem_ctx_t ctx);
extern int spml_ucx_default_progress(void);
extern int spml_ucx_ctx_progress(void);
extern int spml_ucx_progress_aux_ctx(void);
void mca_spml_ucx_async_cb(int fd, short event, void *cbdata);

int mca_spml_ucx_init_put_op_mask(mca_spml_ucx_ctx_t *ctx, size_t nprocs);
int mca_spml_ucx_clear_put_op_mask(mca_spml_ucx_ctx_t *ctx);
int mca_spml_ucx_peer_mkey_cache_add(ucp_peer_t *ucp_peer, int index);
int mca_spml_ucx_peer_mkey_cache_del(ucp_peer_t *ucp_peer, int segno);
void mca_spml_ucx_peer_mkey_cache_release(ucp_peer_t *ucp_peer);
void mca_spml_ucx_peer_mkey_cache_init(mca_spml_ucx_ctx_t *ucx_ctx, int pe);

extern int mca_spml_ucx_put_signal(shmem_ctx_t ctx, void* dst_addr, size_t size, void*
        src_addr, uint64_t *sig_addr, uint64_t signal, int sig_op, int dst);

extern int mca_spml_ucx_put_signal_nb(shmem_ctx_t ctx, void* dst_addr, size_t size,
        void* src_addr, uint64_t *sig_addr, uint64_t signal, int sig_op, int
        dst);
extern void mca_spml_ucx_wait_until_all(void *ivars, int cmp, void
        *cmp_value, size_t nelems, const int *status, int datatype);
extern size_t mca_spml_ucx_wait_until_any(void *ivars, int cmp, void
        *cmp_value, size_t nelems, const int *status, int datatype);
extern size_t mca_spml_ucx_wait_until_some(void *ivars, int cmp, void
        *cmp_value, size_t nelems, size_t *indices, const int *status, int
        datatype);
extern void mca_spml_ucx_wait_until_all_vector(void *ivars, int cmp, void
        *cmp_values, size_t nelems, const int *status, int datatype);
extern size_t mca_spml_ucx_wait_until_any_vector(void *ivars, int cmp, void
        *cmp_value, size_t nelems, const int *status, int datatype);
extern size_t mca_spml_ucx_wait_until_some_vector(void *ivars, int cmp, void
        *cmp_value, size_t nelems, size_t *indices, const int *status, int
        datatype);
extern int mca_spml_ucx_test_all(void *ivars, int cmp, void *cmp_value,
        size_t nelems, const int *status, int datatype);
extern size_t mca_spml_ucx_test_any(void *ivars, int cmp, void *cmp_value,
        size_t nelems, const int *status, int datatype);
extern size_t mca_spml_ucx_test_some(void *ivars, int cmp, void *cmp_value,
        size_t nelems, size_t *indices, const int *status, int datatype);
extern int mca_spml_ucx_test_all_vector(void *ivars, int cmp, void
        *cmp_values, size_t nelems, const int *status, int datatype);
extern size_t mca_spml_ucx_test_any_vector(void *ivars, int cmp, void *cmp_values, size_t nelems,
                                           const int *status, int datatype);
extern size_t mca_spml_ucx_test_some_vector(void *ivars, int cmp, void *cmp_values, size_t nelems,
                                            size_t *indices, const int *status, int datatype);
extern int mca_spml_ucx_team_sync(shmem_team_t team);
extern int mca_spml_ucx_team_my_pe(shmem_team_t team);
extern int mca_spml_ucx_team_n_pes(shmem_team_t team);
extern int mca_spml_ucx_team_get_config(shmem_team_t team, long config_mask,
        shmem_team_config_t *config);
extern int mca_spml_ucx_team_translate_pe(shmem_team_t src_team, int src_pe,
        shmem_team_t dest_team);
extern int mca_spml_ucx_team_split_strided(shmem_team_t parent_team, int start, int
        stride, int size, const shmem_team_config_t *config, long config_mask,
        shmem_team_t *new_team);
extern int mca_spml_ucx_team_split_2d(shmem_team_t parent_team, int xrange, const
        shmem_team_config_t *xaxis_config, long xaxis_mask, shmem_team_t
        *xaxis_team, const shmem_team_config_t *yaxis_config, long yaxis_mask,
        shmem_team_t *yaxis_team);
extern int mca_spml_ucx_team_destroy(shmem_team_t team);
extern int mca_spml_ucx_team_get(shmem_ctx_t ctx, shmem_team_t *team);
extern int mca_spml_ucx_team_create_ctx(shmem_team_t team, long options, shmem_ctx_t *ctx);
extern int mca_spml_ucx_team_alltoall(shmem_team_t team, void
        *dest, const void *source, size_t nelems, int datatype);
extern int mca_spml_ucx_team_alltoalls(shmem_team_t team, void
        *dest, const void *source, ptrdiff_t dst, ptrdiff_t sst, size_t nelems,
        int datatype);
extern int mca_spml_ucx_team_broadcast(shmem_team_t team, void
        *dest, const void *source, size_t nelems, int PE_root, int datatype);
extern int mca_spml_ucx_team_collect(shmem_team_t team, void
        *dest, const void *source, size_t nelems, int datatype);
extern int mca_spml_ucx_team_fcollect(shmem_team_t team, void
        *dest, const void *source, size_t nelems, int datatype);
extern int mca_spml_ucx_team_reduce(shmem_team_t team, void
        *dest, const void *source, size_t nreduce, int operation, int datatype);

extern unsigned
mca_spml_ucx_mem_map_flags_symmetric_rkey(struct mca_spml_ucx *spml_ucx);

extern void mca_spml_ucx_rkey_store_init(mca_spml_ucx_rkey_store_t *store);
extern void mca_spml_ucx_rkey_store_cleanup(mca_spml_ucx_rkey_store_t *store);

static inline int
mca_spml_ucx_peer_mkey_get(ucp_peer_t *ucp_peer, int index, spml_ucx_cached_mkey_t **out_rmkey)
{
    *out_rmkey = NULL;
    if (OPAL_UNLIKELY((index >= (int)ucp_peer->mkeys_cnt) || (0 > index))) {
        SPML_UCX_ERROR("Failed to get mkey for segment: bad index = %d, cached mkeys count: %zu",
                       index, ucp_peer->mkeys_cnt);
        return OSHMEM_ERR_BAD_PARAM;
    }
    *out_rmkey = ucp_peer->mkeys[index];
    return OSHMEM_SUCCESS;
}

static inline void mca_spml_ucx_aux_lock(void)
{
    if (mca_spml_ucx.async_progress) {
        pthread_spin_lock(&mca_spml_ucx.async_lock);
    }
}

static inline void mca_spml_ucx_aux_unlock(void)
{
    if (mca_spml_ucx.async_progress) {
        pthread_spin_unlock(&mca_spml_ucx.async_lock);
    }
}

int mca_spml_ucx_ctx_mkey_new(mca_spml_ucx_ctx_t *ucx_ctx, int pe, uint32_t segno, spml_ucx_mkey_t **mkey);
int mca_spml_ucx_ctx_mkey_cache(mca_spml_ucx_ctx_t *ucx_ctx, sshmem_mkey_t *mkey, uint32_t segno, int dst_pe);
int mca_spml_ucx_ctx_mkey_add(mca_spml_ucx_ctx_t *ucx_ctx, int pe, uint32_t segno, sshmem_mkey_t *mkey, spml_ucx_mkey_t **ucx_mkey);
int mca_spml_ucx_ctx_mkey_del(mca_spml_ucx_ctx_t *ucx_ctx, int pe, uint32_t segno, spml_ucx_mkey_t *ucx_mkey);

static inline int
mca_spml_ucx_ctx_mkey_by_seg(mca_spml_ucx_ctx_t *ucx_ctx, int pe, uint32_t segno, spml_ucx_mkey_t **mkey)
{
    ucp_peer_t *ucp_peer;
    spml_ucx_cached_mkey_t *ucx_cached_mkey;
    int rc;
    ucp_peer = &(ucx_ctx->ucp_peers[pe]);
    rc = mca_spml_ucx_peer_mkey_get(ucp_peer, segno, &ucx_cached_mkey);
    if (OSHMEM_SUCCESS != rc) {
        return rc;
    }
    *mkey = &(ucx_cached_mkey->key);
    return OSHMEM_SUCCESS;
}

static inline spml_ucx_mkey_t * 
mca_spml_ucx_ctx_mkey_by_va(shmem_ctx_t ctx, int pe, void *va, void **rva, mca_spml_ucx_t* module)
{
    spml_ucx_cached_mkey_t **mkey;
    mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
    size_t i;

    mkey = ucx_ctx->ucp_peers[pe].mkeys;
    for (i = 0; i < ucx_ctx->ucp_peers[pe].mkeys_cnt; i++) {
        if (NULL == mkey[i]) {
            continue;
        }
        if (OPAL_LIKELY(map_segment_is_va_in(&mkey[i]->super.super, va))) {
            *rva = map_segment_va2rva(&mkey[i]->super, va);
            return &mkey[i]->key;
        }
    }
    return NULL;
}

static inline int ucx_status_to_oshmem(ucs_status_t status)
{
#if OSHMEM_PARAM_CHECK == 1
    return OPAL_LIKELY(UCS_OK == status) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
#else
    return OSHMEM_SUCCESS;
#endif
}

static inline int ucx_status_to_oshmem_nb(ucs_status_t status)
{
#if OSHMEM_PARAM_CHECK == 1
    return OPAL_LIKELY(status >= 0) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
#else
    return OSHMEM_SUCCESS;
#endif
}

static inline int mca_spml_ucx_is_strong_ordering(mca_spml_ucx_ctx_t *ctx)
{
    return (ctx->strong_sync != SPML_UCX_STRONG_ORDERING_NONE) ||
           ctx->synchronized_quiet;
}

static inline void mca_spml_ucx_remote_op_posted(mca_spml_ucx_ctx_t *ctx, int dst)
{
    if (OPAL_UNLIKELY(mca_spml_ucx_is_strong_ordering(ctx))) {
        if (!opal_bitmap_is_set_bit(&ctx->put_op_bitmap, dst)) {
            ctx->put_proc_indexes[ctx->put_proc_count++] = dst;
            opal_bitmap_set_bit(&ctx->put_op_bitmap, dst);
        }
    }
}

#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64
#define MCA_SPML_UCX_CTXS_ARRAY_INC 64

END_C_DECLS

#endif