File: mcs_mutex.c

package info (click to toggle)
mpich 4.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 423,384 kB
  • sloc: ansic: 1,088,434; cpp: 71,364; javascript: 40,763; f90: 22,829; sh: 17,463; perl: 14,773; xml: 14,418; python: 10,265; makefile: 9,246; fortran: 8,008; java: 4,355; asm: 324; ruby: 176; lisp: 19; php: 8; sed: 4
file content (225 lines) | stat: -rw-r--r-- 6,317 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
/*
 * Copyright (C) by Argonne National Laboratory
 *     See COPYRIGHT in top-level directory
 */

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <strings.h>

#include <mpi.h>
#include "mcs_mutex.h"

/* TODO: Make these mutex operations no-ops for sequential runs */

/** Create an MCS mutex.  Collective on comm.
  *
  * @param[out] comm communicator containing all processes that will use the
  *                  mutex
  * @param[out] tail_rank rank of the process in comm that holds the tail
  *                  pointer
  * @param[out] hdl  handle to the mutex
  * @return          MPI status
  */
int MCS_Mutex_create(int tail_rank, MPI_Comm comm, MCS_Mutex * hdl_out)
{
    int rank, nproc;
    MCS_Mutex hdl;

    hdl = malloc(sizeof(struct mcs_mutex_s));
    assert(hdl != NULL);

    MPI_Comm_dup(comm, &hdl->comm);

    MPI_Comm_rank(hdl->comm, &rank);
    MPI_Comm_size(hdl->comm, &nproc);

    hdl->tail_rank = tail_rank;

#ifdef USE_WIN_SHARED
    MPI_Win_allocate_shared(2 * sizeof(int), sizeof(int), MPI_INFO_NULL,
                            hdl->comm, &hdl->base, &hdl->window);
#else
#ifdef USE_WIN_ALLOC_SHM
    MPI_Info_create(&hdl->win_info);
    MPI_Info_set(hdl->win_info, "alloc_shm", "true");
#else
    MPI_Info_create(&hdl->win_info);
    MPI_Info_set(hdl->win_info, "alloc_shm", "false");
#endif
    MPI_Win_allocate(2 * sizeof(int), sizeof(int), hdl->win_info, hdl->comm,
                     &hdl->base, &hdl->window);
#endif

    MPI_Win_lock_all(0, hdl->window);

    hdl->base[0] = MPI_PROC_NULL;
    hdl->base[1] = MPI_PROC_NULL;

    MPI_Win_sync(hdl->window);
    MPI_Barrier(hdl->comm);

    *hdl_out = hdl;
    return MPI_SUCCESS;
}


/** Free an MCS mutex.  Collective on ranks in the communicator used at the
  * time of creation.
  *
  * @param[in] hdl handle to the group that will be freed
  * @return        MPI status
  */
int MCS_Mutex_free(MCS_Mutex * hdl_ptr)
{
    MCS_Mutex hdl = *hdl_ptr;

    MPI_Win_unlock_all(hdl->window);

    MPI_Win_free(&hdl->window);
    MPI_Comm_free(&hdl->comm);
#ifndef USE_WIN_SHARED
    MPI_Info_free(&hdl->win_info);
#endif

    free(hdl);
    hdl_ptr = NULL;

    return MPI_SUCCESS;
}


/** Lock a mutex.
  *
  * @param[in] hdl   Handle to the mutex
  * @return          MPI status
  */
int MCS_Mutex_lock(MCS_Mutex hdl)
{
    int rank, nproc;
    int prev;

    MPI_Comm_rank(hdl->comm, &rank);
    MPI_Comm_size(hdl->comm, &nproc);

    /* This store is safe, since it cannot happen concurrently with a remote
     * write */
    hdl->base[MCS_MTX_ELEM_DISP] = MPI_PROC_NULL;
    MPI_Win_sync(hdl->window);

    MPI_Fetch_and_op(&rank, &prev, MPI_INT, hdl->tail_rank, MCS_MTX_TAIL_DISP,
                     MPI_REPLACE, hdl->window);
    MPI_Win_flush(hdl->tail_rank, hdl->window);

    /* If there was a previous tail, update their next pointer and wait for
     * notification.  Otherwise, the mutex was successfully acquired. */
    if (prev != MPI_PROC_NULL) {
        /* Wait for notification */
        MPI_Status status;

        MPI_Accumulate(&rank, 1, MPI_INT, prev, MCS_MTX_ELEM_DISP, 1, MPI_INT, MPI_REPLACE,
                       hdl->window);
        MPI_Win_flush(prev, hdl->window);

        debug_print("%2d: LOCK   - waiting for notification from %d\n", rank, prev);
        MPI_Recv(NULL, 0, MPI_BYTE, prev, MCS_MUTEX_TAG, hdl->comm, &status);
    }

    debug_print("%2d: LOCK   - lock acquired\n", rank);

    return MPI_SUCCESS;
}


/** Attempt to acquire a mutex.
  *
  * @param[in] hdl   Handle to the mutex
  * @param[out] success Indicates whether the mutex was acquired
  * @return          MPI status
  */
int MCS_Mutex_trylock(MCS_Mutex hdl, int *success)
{
    int rank, nproc;
    int tail, nil = MPI_PROC_NULL;

    MPI_Comm_rank(hdl->comm, &rank);
    MPI_Comm_size(hdl->comm, &nproc);

    /* This store is safe, since it cannot happen concurrently with a remote
     * write */
    hdl->base[MCS_MTX_ELEM_DISP] = MPI_PROC_NULL;
    MPI_Win_sync(hdl->window);

    /* Check if the lock is available and claim it if it is. */
    MPI_Compare_and_swap(&rank, &nil, &tail, MPI_INT, hdl->tail_rank,
                         MCS_MTX_TAIL_DISP, hdl->window);
    MPI_Win_flush(hdl->tail_rank, hdl->window);

    /* If the old tail was MPI_PROC_NULL, we have claimed the mutex */
    *success = (tail == nil);

    debug_print("%2d: TRYLOCK - %s\n", rank, (*success) ? "Success" : "Non-success");

    return MPI_SUCCESS;
}


/** Unlock a mutex.
  *
  * @param[in] hdl   Handle to the mutex
  * @return          MPI status
  */
int MCS_Mutex_unlock(MCS_Mutex hdl)
{
    int rank, nproc, next;

    MPI_Comm_rank(hdl->comm, &rank);
    MPI_Comm_size(hdl->comm, &nproc);

    MPI_Win_sync(hdl->window);

    /* Read my next pointer.  FOP is used since another process may write to
     * this location concurrent with this read. */
    MPI_Fetch_and_op(NULL, &next, MPI_INT, rank, MCS_MTX_ELEM_DISP, MPI_NO_OP, hdl->window);
    MPI_Win_flush(rank, hdl->window);

    if (next == MPI_PROC_NULL) {
        int tail;
        int nil = MPI_PROC_NULL;

        /* Check if we are the at the tail of the lock queue.  If so, we're
         * done.  If not, we need to send notification. */
        MPI_Compare_and_swap(&nil, &rank, &tail, MPI_INT, hdl->tail_rank,
                             MCS_MTX_TAIL_DISP, hdl->window);
        MPI_Win_flush(hdl->tail_rank, hdl->window);

        if (tail != rank) {
            debug_print("%2d: UNLOCK - waiting for next pointer (tail = %d)\n", rank, tail);
            assert(tail >= 0 && tail < nproc);

            for (;;) {
                int flag;

                MPI_Fetch_and_op(NULL, &next, MPI_INT, rank, MCS_MTX_ELEM_DISP,
                                 MPI_NO_OP, hdl->window);

                MPI_Win_flush(rank, hdl->window);
                if (next != MPI_PROC_NULL)
                    break;

                MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
            }
        }
    }

    /* Notify the next waiting process */
    if (next != MPI_PROC_NULL) {
        debug_print("%2d: UNLOCK - notifying %d\n", rank, next);
        MPI_Send(NULL, 0, MPI_BYTE, next, MCS_MUTEX_TAG, hdl->comm);
    }

    debug_print("%2d: UNLOCK - lock released\n", rank);

    return MPI_SUCCESS;
}