1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <strings.h>
#include <mpi.h>
#include "mcs_mutex.h"
/* TODO: Make these mutex operations no-ops for sequential runs */
/** Create an MCS mutex. Collective on comm.
*
* @param[out] comm communicator containing all processes that will use the
* mutex
* @param[out] tail_rank rank of the process in comm that holds the tail
* pointer
* @param[out] hdl handle to the mutex
* @return MPI status
*/
int MCS_Mutex_create(int tail_rank, MPI_Comm comm, MCS_Mutex * hdl_out)
{
int rank, nproc;
MCS_Mutex hdl;
hdl = malloc(sizeof(struct mcs_mutex_s));
assert(hdl != NULL);
MPI_Comm_dup(comm, &hdl->comm);
MPI_Comm_rank(hdl->comm, &rank);
MPI_Comm_size(hdl->comm, &nproc);
hdl->tail_rank = tail_rank;
#ifdef USE_WIN_SHARED
MPI_Win_allocate_shared(2 * sizeof(int), sizeof(int), MPI_INFO_NULL,
hdl->comm, &hdl->base, &hdl->window);
#else
#ifdef USE_WIN_ALLOC_SHM
MPI_Info_create(&hdl->win_info);
MPI_Info_set(hdl->win_info, "alloc_shm", "true");
#else
MPI_Info_create(&hdl->win_info);
MPI_Info_set(hdl->win_info, "alloc_shm", "false");
#endif
MPI_Win_allocate(2 * sizeof(int), sizeof(int), hdl->win_info, hdl->comm,
&hdl->base, &hdl->window);
#endif
MPI_Win_lock_all(0, hdl->window);
hdl->base[0] = MPI_PROC_NULL;
hdl->base[1] = MPI_PROC_NULL;
MPI_Win_sync(hdl->window);
MPI_Barrier(hdl->comm);
*hdl_out = hdl;
return MPI_SUCCESS;
}
/** Free an MCS mutex. Collective on ranks in the communicator used at the
* time of creation.
*
* @param[in] hdl handle to the group that will be freed
* @return MPI status
*/
int MCS_Mutex_free(MCS_Mutex * hdl_ptr)
{
MCS_Mutex hdl = *hdl_ptr;
MPI_Win_unlock_all(hdl->window);
MPI_Win_free(&hdl->window);
MPI_Comm_free(&hdl->comm);
#ifndef USE_WIN_SHARED
MPI_Info_free(&hdl->win_info);
#endif
free(hdl);
hdl_ptr = NULL;
return MPI_SUCCESS;
}
/** Lock a mutex.
*
* @param[in] hdl Handle to the mutex
* @return MPI status
*/
int MCS_Mutex_lock(MCS_Mutex hdl)
{
int rank, nproc;
int prev;
MPI_Comm_rank(hdl->comm, &rank);
MPI_Comm_size(hdl->comm, &nproc);
/* This store is safe, since it cannot happen concurrently with a remote
* write */
hdl->base[MCS_MTX_ELEM_DISP] = MPI_PROC_NULL;
MPI_Win_sync(hdl->window);
MPI_Fetch_and_op(&rank, &prev, MPI_INT, hdl->tail_rank, MCS_MTX_TAIL_DISP,
MPI_REPLACE, hdl->window);
MPI_Win_flush(hdl->tail_rank, hdl->window);
/* If there was a previous tail, update their next pointer and wait for
* notification. Otherwise, the mutex was successfully acquired. */
if (prev != MPI_PROC_NULL) {
/* Wait for notification */
MPI_Status status;
MPI_Accumulate(&rank, 1, MPI_INT, prev, MCS_MTX_ELEM_DISP, 1, MPI_INT, MPI_REPLACE,
hdl->window);
MPI_Win_flush(prev, hdl->window);
debug_print("%2d: LOCK - waiting for notification from %d\n", rank, prev);
MPI_Recv(NULL, 0, MPI_BYTE, prev, MCS_MUTEX_TAG, hdl->comm, &status);
}
debug_print("%2d: LOCK - lock acquired\n", rank);
return MPI_SUCCESS;
}
/** Attempt to acquire a mutex.
*
* @param[in] hdl Handle to the mutex
* @param[out] success Indicates whether the mutex was acquired
* @return MPI status
*/
int MCS_Mutex_trylock(MCS_Mutex hdl, int *success)
{
int rank, nproc;
int tail, nil = MPI_PROC_NULL;
MPI_Comm_rank(hdl->comm, &rank);
MPI_Comm_size(hdl->comm, &nproc);
/* This store is safe, since it cannot happen concurrently with a remote
* write */
hdl->base[MCS_MTX_ELEM_DISP] = MPI_PROC_NULL;
MPI_Win_sync(hdl->window);
/* Check if the lock is available and claim it if it is. */
MPI_Compare_and_swap(&rank, &nil, &tail, MPI_INT, hdl->tail_rank,
MCS_MTX_TAIL_DISP, hdl->window);
MPI_Win_flush(hdl->tail_rank, hdl->window);
/* If the old tail was MPI_PROC_NULL, we have claimed the mutex */
*success = (tail == nil);
debug_print("%2d: TRYLOCK - %s\n", rank, (*success) ? "Success" : "Non-success");
return MPI_SUCCESS;
}
/** Unlock a mutex.
*
* @param[in] hdl Handle to the mutex
* @return MPI status
*/
int MCS_Mutex_unlock(MCS_Mutex hdl)
{
int rank, nproc, next;
MPI_Comm_rank(hdl->comm, &rank);
MPI_Comm_size(hdl->comm, &nproc);
MPI_Win_sync(hdl->window);
/* Read my next pointer. FOP is used since another process may write to
* this location concurrent with this read. */
MPI_Fetch_and_op(NULL, &next, MPI_INT, rank, MCS_MTX_ELEM_DISP, MPI_NO_OP, hdl->window);
MPI_Win_flush(rank, hdl->window);
if (next == MPI_PROC_NULL) {
int tail;
int nil = MPI_PROC_NULL;
/* Check if we are the at the tail of the lock queue. If so, we're
* done. If not, we need to send notification. */
MPI_Compare_and_swap(&nil, &rank, &tail, MPI_INT, hdl->tail_rank,
MCS_MTX_TAIL_DISP, hdl->window);
MPI_Win_flush(hdl->tail_rank, hdl->window);
if (tail != rank) {
debug_print("%2d: UNLOCK - waiting for next pointer (tail = %d)\n", rank, tail);
assert(tail >= 0 && tail < nproc);
for (;;) {
int flag;
MPI_Fetch_and_op(NULL, &next, MPI_INT, rank, MCS_MTX_ELEM_DISP,
MPI_NO_OP, hdl->window);
MPI_Win_flush(rank, hdl->window);
if (next != MPI_PROC_NULL)
break;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
}
}
}
/* Notify the next waiting process */
if (next != MPI_PROC_NULL) {
debug_print("%2d: UNLOCK - notifying %d\n", rank, next);
MPI_Send(NULL, 0, MPI_BYTE, next, MCS_MUTEX_TAG, hdl->comm);
}
debug_print("%2d: UNLOCK - lock released\n", rank);
return MPI_SUCCESS;
}
|