File: wait_sync.h

package info (click to toggle)
openmpi 5.0.8-4
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 201,684 kB
  • sloc: ansic: 613,078; makefile: 42,353; sh: 11,194; javascript: 9,244; f90: 7,052; java: 6,404; perl: 5,179; python: 1,859; lex: 740; fortran: 61; cpp: 20; tcl: 12
file content (158 lines) | stat: -rw-r--r-- 6,506 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2020 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2005 The Regents of the University of California.
 *                         All rights reserved.
 * Copyright (c) 2007-2018 Los Alamos National Security, LLC.  All rights
 *                         reserved.
 * Copyright (c) 2016      Mellanox Technologies. All rights reserved.
 * Copyright (c) 2015-2016 Research Organization for Information Science
 *                         and Technology (RIST). All rights reserved.
 * Copyright (c) 2017-2022 IBM Corporation. All rights reserved.
 * Copyright (c) 2019      Sandia National Laboratories.  All rights reserved.
 * Copyright (c) 2021      Argonne National Laboratory.  All rights reserved.
 *
 * $COPYRIGHT$
 *
 * Additional copyrights may follow
 *
 * $HEADER$
 */

#ifndef OPAL_MCA_THREADS_WAIT_SYNC_H
#define OPAL_MCA_THREADS_WAIT_SYNC_H

#include "opal/mca/threads/condition.h"
#include "opal/mca/threads/mutex.h"
#include "opal/mca/threads/threads.h"
#include "opal/runtime/opal_progress.h"
#include "opal/sys/atomic.h"

BEGIN_C_DECLS

extern int opal_max_thread_in_progress;

typedef struct ompi_wait_sync_t {
    opal_atomic_int32_t count;
    int32_t status;
    opal_thread_internal_cond_t condition;
    opal_thread_internal_mutex_t lock;
    struct ompi_wait_sync_t *next;
    struct ompi_wait_sync_t *prev;
    volatile bool signaling;
} ompi_wait_sync_t;

#define SYNC_WAIT(sync) (opal_using_threads() ? ompi_sync_wait_mt(sync) : sync_wait_st(sync))

/* The loop in release handles a race condition between the signaling
 * thread and the destruction of the condition variable. The signaling
 * member will be set to false after the final signaling thread has
 * finished operating on the sync object. This is done to avoid
 * extra atomics in the signalling function and keep it as fast
 * as possible. Note that the race window is small so spinning here
 * is more optimal than sleeping since this macro is called in
 * the critical path. */
#define WAIT_SYNC_RELEASE(sync)                                \
    if (opal_using_threads()) {                                \
        while ((sync)->signaling) {                            \
            if (opal_progress_yield_when_idle) {               \
                opal_thread_yield();                           \
            }                                                  \
            continue;                                          \
        }                                                      \
        opal_thread_internal_cond_destroy(&(sync)->condition); \
        opal_thread_internal_mutex_destroy(&(sync)->lock);     \
    }

#define WAIT_SYNC_RELEASE_NOWAIT(sync)                         \
    if (opal_using_threads()) {                                \
        opal_thread_internal_cond_destroy(&(sync)->condition); \
        opal_thread_internal_mutex_destroy(&(sync)->lock);     \
    }

#define WAIT_SYNC_SIGNAL(sync)                                \
    if (opal_using_threads()) {                               \
        opal_thread_internal_mutex_lock(&(sync)->lock);       \
        opal_thread_internal_cond_signal(&(sync)->condition); \
        opal_thread_internal_mutex_unlock(&(sync)->lock);     \
        (sync)->signaling = false;                            \
    }

#define WAIT_SYNC_SIGNALLED(sync)  \
    {                              \
        (sync)->signaling = false; \
    }

/* not static for inline "wait_sync_st" */
OPAL_DECLSPEC extern ompi_wait_sync_t *opal_threads_base_wait_sync_list;

OPAL_DECLSPEC int ompi_sync_wait_mt(ompi_wait_sync_t *sync);
static inline int sync_wait_st(ompi_wait_sync_t *sync)
{
    assert(NULL == opal_threads_base_wait_sync_list);
    assert(NULL == sync->next);
    opal_threads_base_wait_sync_list = sync;

    while (sync->count > 0) {
        opal_progress();
    }
    opal_threads_base_wait_sync_list = NULL;

    return sync->status;
}

#define WAIT_SYNC_INIT(sync, c)                                    \
    do {                                                           \
        (sync)->count = (c);                                       \
        (sync)->next = NULL;                                       \
        (sync)->prev = NULL;                                       \
        (sync)->status = 0;                                        \
        (sync)->signaling = (0 != (c));                            \
        if (opal_using_threads()) {                                \
            opal_thread_internal_cond_init(&(sync)->condition);    \
            opal_thread_internal_mutex_init(&(sync)->lock, false); \
        }                                                          \
    } while (0)

/**
 * Wake up all syncs with a particular status. If status is OMPI_SUCCESS this
 * operation is a NO-OP. Otherwise it will trigger the "error condition" from
 * all registered sync.
 */
OPAL_DECLSPEC void opal_threads_base_wait_sync_global_wakeup_st(int status);
OPAL_DECLSPEC void opal_threads_base_wait_sync_global_wakeup_mt(int status);
#define wait_sync_global_wakeup(st) \
    (opal_using_threads() ? opal_threads_base_wait_sync_global_wakeup_mt(st) : \
    		                opal_threads_base_wait_sync_global_wakeup_st(st))

/**
 * Update the status of the synchronization primitive. If an error is
 * reported the synchronization is completed and the signal
 * triggered. The status of the synchronization will be reported to
 * the waiting threads.
 */
static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int status)
{
    if (OPAL_LIKELY(OPAL_SUCCESS == status)) {
        if (0 != (OPAL_THREAD_ADD_FETCH32(&sync->count, -updates))) {
            return;
        }
    } else {
        /* this is an error path so just use the atomic */
        sync->status = status;
        opal_atomic_wmb();
        opal_atomic_swap_32(&sync->count, 0);
    }
    WAIT_SYNC_SIGNAL(sync);
}

END_C_DECLS

#endif /* OPAL_MCA_THREADS_WAIT_SYNC_H */