1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
|
#include "MPI_Wrappers.h"
#include "Utility.h"
using ATC_Utility::to_string;
#include "ATC_Error.h"
using ATC::ATC_Error;
using std::cout;
using std::string;
#ifdef ISOLATE_FE
#include "Matrix.h"
using ATC_Matrix::SparseMatrix;
#endif
namespace MPI_Wrappers {
int rank(MPI_Comm comm)
{
int rank;
MPI_Comm_rank(comm, &rank);
return rank;
}
bool rank_zero(MPI_Comm comm) { return rank(comm)==0;}
int size(MPI_Comm comm)
{
int size;
MPI_Comm_size(comm, &size);
return size;
}
bool serial(MPI_Comm comm) { return size(comm) == 0; }
void broadcast(MPI_Comm comm, double *buf, int count)
{
int error = MPI_Bcast(buf, count, MPI_DOUBLE, 0, comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in broadcast "+to_string(error));
}
void int_broadcast(MPI_Comm comm, int *buf, int count)
{
int error = MPI_Bcast(buf, count, MPI_INT, 0, comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in broadcast "+to_string(error));
}
void allsum(MPI_Comm comm, void *send_buf, double *rec_buf, int count)
{
int error = MPI_Allreduce(send_buf, rec_buf, count, MPI_DOUBLE, MPI_SUM,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in allsum "+to_string(error));
}
void int_allsum(MPI_Comm comm, void *send_buf, int *rec_buf, int count)
{
int error = MPI_Allreduce(send_buf, rec_buf, count, MPI_INT, MPI_SUM,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in int_allsum "+to_string(error));
}
void int_scansum(MPI_Comm comm, int *send_buf, int *rec_buf, int count)
{
int error = MPI_Scan(send_buf, rec_buf, count, MPI_INT, MPI_SUM,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in int_scansum "+to_string(error));
}
void allmax(MPI_Comm comm, double *send_buf, double *rec_buf, int count)
{
int error = MPI_Allreduce(send_buf, rec_buf, count, MPI_DOUBLE, MPI_MAX,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in allmax "+to_string(error));
}
void int_allmax(MPI_Comm comm, int *send_buf, int *rec_buf, int count)
{
int error = MPI_Allreduce(send_buf, rec_buf, count, MPI_INT, MPI_MAX,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in int_allmax "+to_string(error));
}
void allmin(MPI_Comm comm, double *send_buf, double *rec_buf, int count)
{
int error = MPI_Allreduce(send_buf, rec_buf, count, MPI_DOUBLE, MPI_MIN,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in allmax "+to_string(error));
}
void int_allmin(MPI_Comm comm, int *send_buf, int *rec_buf, int count)
{
int error = MPI_Allreduce(send_buf, rec_buf, count, MPI_INT, MPI_MIN,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in int_allmax "+to_string(error));
}
int rank_min(MPI_Comm comm, double *send_buf, double *rec_buf, int count)
{
int myRank;
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
DOUBLE_RANK *in = new DOUBLE_RANK[count];
DOUBLE_RANK *out = new DOUBLE_RANK[count];
for (int i = 0; i < count; i++) {
in[i].val = send_buf[i];
in[i].rank = myRank;
}
int error = MPI_Allreduce(in, out, count, MPI_DOUBLE_INT, MPI_MINLOC,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in rank_min "+to_string(error));
for (int i = 0; i < count; i++) {
rec_buf[i] = out[i].val;
}
delete[] in;
delete[] out;
return out[0].rank;
}
void int_recv(MPI_Comm comm, int *recv_buf, int max_size, int iproc)
{
MPI_Status status;
MPI_Request request;
int tmp, error, recv_size;
error = MPI_Irecv(recv_buf,max_size,MPI_INT,iproc,0,comm,&request);
error = error && MPI_Send(&tmp,0,MPI_INT,iproc,0,comm);
error = error && MPI_Wait(&request,&status);
error = error && MPI_Get_count(&status,MPI_DOUBLE,&recv_size);
if (error != MPI_SUCCESS) throw ATC_Error("error in int_recv "+to_string(error));
}
void recv(MPI_Comm comm, double *recv_buf, int max_size,int iproc)
{
MPI_Status status;
MPI_Request request;
int tmp, error, recv_size;
error = MPI_Irecv(recv_buf,max_size,MPI_DOUBLE,iproc,0,comm,&request);
error = error && MPI_Send(&tmp,0,MPI_INT,iproc,0,comm);
error = error && MPI_Wait(&request,&status);
error = error && MPI_Get_count(&status,MPI_DOUBLE,&recv_size);
if (error != MPI_SUCCESS) throw ATC_Error("error in recv "+to_string(error));
}
void int_send(MPI_Comm comm, int *send_buf,int send_size)
{
MPI_Status status;
int tmp, error;
error = MPI_Recv(&tmp,0,MPI_INT,0,0,comm,&status);
error = error && MPI_Rsend(send_buf,send_size,MPI_INT,0,0,comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in int_send "+to_string(error));
}
void send(MPI_Comm comm, double *send_buf,int send_size)
{
MPI_Status status;
int tmp, error;
error = MPI_Recv(&tmp,0,MPI_INT,0,0,comm,&status);
error = error && MPI_Rsend(send_buf,send_size,MPI_DOUBLE,0,0,comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in int_send "+to_string(error));
}
void int_scatter(MPI_Comm comm, int *send_buf, int *rec_buf, int count)
{
int error;
int numprocs = size(comm);
int *sizes = new int[numprocs];
int *displacements = new int[numprocs];
for (int i = 0; i < numprocs; ++i) {
sizes[i] = 1;
displacements[i] = i;
}
error = MPI_Scatterv(send_buf, sizes, displacements, MPI_INT, rec_buf, count, MPI_INT, 0, comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in int_scatter "+to_string(error));
delete[] sizes;
delete[] displacements;
}
void allgatherv(MPI_Comm comm, double *send_buf, int send_count,
double *rec_buf, int *rec_counts, int *displacements)
{
int error = MPI_Allgatherv(send_buf, send_count, MPI_DOUBLE,
rec_buf, rec_counts, displacements, MPI_DOUBLE,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in allgatherv "+to_string(error));
}
void gather(MPI_Comm comm, double send, double* recv)
{
int send_count = 1;
int recv_count = 1;
int root = 0;
int error = MPI_Gather(&send, send_count, MPI_DOUBLE,
recv, recv_count, MPI_DOUBLE, root, comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in allgatherv "+to_string(error));
}
void int_allgather(MPI_Comm comm, int send, int* recv)
{
int send_count = 1;
int recv_count = 1;
int error = MPI_Allgather(&send, send_count, MPI_INT,
recv, recv_count, MPI_INT, comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in allgatherv "+to_string(error));
}
void logical_or(MPI_Comm comm, void *send_buf, int *rec_buf, int count)
{
int error = MPI_Allreduce(send_buf, rec_buf, count, MPI_INT, MPI_LOR,
comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in logical_or "+to_string(error));
}
void barrier(MPI_Comm comm)
{
int error = MPI_Barrier(comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in barrier "+to_string(error));
}
void stop(MPI_Comm comm, string msg)
{
int error = MPI_Barrier(comm);
if (error != MPI_SUCCESS) throw ATC_Error("error in barrier "+to_string(error));
throw ATC_Error("...stopping "+msg);
}
#ifdef ISOLATE_FE
void sparse_allsum(MPI_Comm comm,SparseMatrix<double> &toShare) const
{
toShare.compress();
// initialize MPI information
int nProcs = size(comm);
int myRank = rank(comm);;
int error;
// get numbers of rows, columns, rowsCRS, and
// sizes (number of nonzero elements in matrix)
SparseMatInfo *recInfo = new SparseMatInfo[nProcs];
SparseMatInfo myInfo;
myInfo.rows = toShare.nRows();
myInfo.cols = toShare.nCols();
myInfo.rowsCRS = toShare.nRowsCRS();
myInfo.size = toShare.size();
error = MPI_Allgather(&myInfo, 4, MPI_INT,
recInfo, 4, MPI_INT, lammps_->world);
if (error != MPI_SUCCESS) throw ATC_Error("error in sparse_allsum_numrows "+to_string(error));
// adjust row sendcounts because recRowsCRS is off by one
int rowCounts[nProcs];
int sizeCounts[nProcs];
// set up total size of receive buffers for Allgatherv calls
int totalRowsCRS = 0;
int totalSize = 0;
// set up array of displacements for Allgatherv calls
int rowOffsets[nProcs];
rowOffsets[0] = 0;
int sizeOffsets[nProcs];
sizeOffsets[0] = 0;
for (int i = 0; i < nProcs; i++) {
// find the total number of entries to share in the mpi calls below
rowCounts[i] = recInfo[i].rowsCRS + 1;
sizeCounts[i] = recInfo[i].size;
totalRowsCRS += rowCounts[i];
totalSize += recInfo[i].size;
// these already have their 0th slot filled in
if (i == 0) continue;
rowOffsets[i] = rowOffsets[i-1] + rowCounts[i-1];
sizeOffsets[i] = sizeOffsets[i-1] + sizeCounts[i-1];
}
// get actual rows
INDEX *rec_ia = new INDEX[totalRowsCRS];
if (toShare.size() == 0) {
double dummy[0];
error = MPI_Allgatherv(dummy, 0, MPI_INT,
rec_ia, rowCounts, rowOffsets, MPI_INT, lammps_->world);
}
else
error = MPI_Allgatherv(toShare.rows(), rowCounts[myRank], MPI_INT,
rec_ia, rowCounts, rowOffsets, MPI_INT, lammps_->world);
if (error != MPI_SUCCESS)
throw ATC_Error("error in sparse_allsum_rowarray "+to_string(error));
// get actual cols
INDEX *rec_ja = new INDEX[totalSize];
error = MPI_Allgatherv(toShare.cols(), sizeCounts[myRank], MPI_INT,
rec_ja, sizeCounts, sizeOffsets, MPI_INT, lammps_->world);
if (error != MPI_SUCCESS)
throw ATC_Error("error in sparse_allsum_colarray "+to_string(error));
// get the array of values
double *rec_vals = new double[totalSize];
error = MPI_Allgatherv(toShare.ptr(), sizeCounts[myRank], MPI_DOUBLE,
rec_vals, sizeCounts, sizeOffsets, MPI_DOUBLE, lammps_->world);
if (error != MPI_SUCCESS)
throw ATC_Error("error in sparse_allsum_valarray "+to_string(error));
INDEX *rec_ia_proc;
INDEX *rec_ja_proc;
double *rec_vals_proc;
for (int i = 0; i < nProcs; i++) {
if (myRank != i) {
// deallocated when tempMat is deleted since it wraps them
rec_ia_proc = new INDEX[rowCounts[i]];
rec_ja_proc = new INDEX[sizeCounts[i]];
rec_vals_proc = new double[sizeCounts[i]];
// copy the data passed with MPI into the new spots
copy(rec_ia + rowOffsets[i],
rec_ia + rowOffsets[i] + rowCounts[i],
rec_ia_proc);
copy(rec_ja + sizeOffsets[i],
rec_ja + sizeOffsets[i] + sizeCounts[i],
rec_ja_proc);
copy(rec_vals + sizeOffsets[i],
rec_vals + sizeOffsets[i] + sizeCounts[i],
rec_vals_proc);
// Does anyone know why we have to declare tempMat here (as well as set it equal to
// something) to avoid segfaults? there are still segfaults, but they happen at a much
// later stage of the game now (and for less benchmarks overall).
SparseMatrix<double> tempMat =
SparseMatrix<double>(rec_ia_proc, rec_ja_proc, rec_vals_proc,
recInfo[i].size, recInfo[i].rows,
recInfo[i].cols, recInfo[i].rowsCRS);
toShare += tempMat;
}
}
delete[] recInfo;
delete[] rec_ia;
delete[] rec_ja;
delete[] rec_vals;
}
#endif
void print_msg(MPI_Comm comm, string msg)
{
if (serial(comm)) { cout << " ATC: " << msg << "\n"; }
else { cout << " ATC: P" << rank(comm) << ", " << msg << "\n"; }
}
void print_msg_once(MPI_Comm comm, string msg, bool prefix, bool endline)
{
if (rank_zero(comm)) {
if (prefix) cout << " ATC: ";
cout << msg;
if (endline) cout << "\n";
}
}
}
|