1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
|
//
// Created by tung on 6/18/15.
//
#include "MPIHelper.h"
#include "timeutil.h"
/**
* Initialize the single getInstance of MPIHelper
*/
MPIHelper& MPIHelper::getInstance() {
static MPIHelper instance;
#ifndef _IQTREE_MPI
instance.setProcessID(0);
instance.setNumProcesses(1);
#endif
return instance;
}
void MPIHelper::init(int argc, char *argv[]) {
#ifdef _IQTREE_MPI
int n_tasks, task_id;
if (MPI_Init(&argc, &argv) != MPI_SUCCESS) {
outError("MPI initialization failed!");
}
MPI_Comm_size(MPI_COMM_WORLD, &n_tasks);
MPI_Comm_rank(MPI_COMM_WORLD, &task_id);
setNumProcesses(n_tasks);
setProcessID(task_id);
setNumTreeReceived(0);
setNumTreeSent(0);
setNumNNISearch(0);
#endif
}
void MPIHelper::finalize() {
#ifdef _IQTREE_MPI
MPI_Finalize();
#endif
}
void MPIHelper::syncRandomSeed() {
#ifdef _IQTREE_MPI
unsigned int rndSeed;
if (MPIHelper::getInstance().isMaster()) {
rndSeed = Params::getInstance().ran_seed;
}
// Broadcast random seed
MPI_Bcast(&rndSeed, 1, MPI_INT, PROC_MASTER, MPI_COMM_WORLD);
if (MPIHelper::getInstance().isWorker()) {
// Params::getInstance().ran_seed = rndSeed + task_id * 100000;
Params::getInstance().ran_seed = rndSeed;
// printf("Process %d: random_seed = %d\n", task_id, Params::getInstance().ran_seed);
}
#endif
}
int MPIHelper::countSameHost() {
#ifdef _IQTREE_MPI
// detect if processes are in the same host
char host_name[MPI_MAX_PROCESSOR_NAME];
int resultlen;
int pID = MPIHelper::getInstance().getProcessID();
MPI_Get_processor_name(host_name, &resultlen);
char *host_names;
host_names = new char[MPI_MAX_PROCESSOR_NAME * MPIHelper::getInstance().getNumProcesses()];
MPI_Allgather(host_name, resultlen+1, MPI_CHAR, host_names, MPI_MAX_PROCESSOR_NAME, MPI_CHAR,
MPI_COMM_WORLD);
int count = 0;
for (int i = 0; i < MPIHelper::getInstance().getNumProcesses(); i++)
if (strcmp(&host_names[i*MPI_MAX_PROCESSOR_NAME], host_name) == 0)
count++;
delete [] host_names;
if (count>1)
cout << "NOTE: " << count << " processes are running on the same host " << host_name << endl;
return count;
#else
return 1;
#endif
}
bool MPIHelper::gotMessage() {
// Check for incoming messages
if (getNumProcesses() == 1)
return false;
#ifdef _IQTREE_MPI
int flag = 0;
MPI_Status status;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &status);
if (flag)
return true;
else
return false;
#else
return false;
#endif
}
#ifdef _IQTREE_MPI
void MPIHelper::sendString(string &str, int dest, int tag) {
char *buf = (char*)str.c_str();
MPI_Send(buf, str.length()+1, MPI_CHAR, dest, tag, MPI_COMM_WORLD);
}
void MPIHelper::sendCheckpoint(Checkpoint *ckp, int dest) {
stringstream ss;
ckp->dump(ss);
string str = ss.str();
sendString(str, dest, TREE_TAG);
}
int MPIHelper::recvString(string &str, int src, int tag) {
MPI_Status status;
MPI_Probe(src, tag, MPI_COMM_WORLD, &status);
int msgCount;
MPI_Get_count(&status, MPI_CHAR, &msgCount);
// receive the message
char *recvBuffer = new char[msgCount];
MPI_Recv(recvBuffer, msgCount, MPI_CHAR, status.MPI_SOURCE, status.MPI_TAG, MPI_COMM_WORLD, &status);
str = recvBuffer;
delete [] recvBuffer;
return status.MPI_SOURCE;
}
int MPIHelper::recvCheckpoint(Checkpoint *ckp, int src) {
string str;
int proc = recvString(str, src, TREE_TAG);
stringstream ss(str);
ckp->load(ss);
return proc;
}
void MPIHelper::broadcastCheckpoint(Checkpoint *ckp) {
int msgCount = 0;
stringstream ss;
string str;
if (isMaster()) {
ckp->dump(ss);
str = ss.str();
msgCount = str.length()+1;
}
// broadcast the count for workers
MPI_Bcast(&msgCount, 1, MPI_INT, PROC_MASTER, MPI_COMM_WORLD);
char *recvBuffer = new char[msgCount];
if (isMaster())
memcpy(recvBuffer, str.c_str(), msgCount);
// broadcast trees to workers
MPI_Bcast(recvBuffer, msgCount, MPI_CHAR, PROC_MASTER, MPI_COMM_WORLD);
if (isWorker()) {
ss.clear();
ss.str(recvBuffer);
ckp->load(ss);
}
delete [] recvBuffer;
}
void MPIHelper::gatherCheckpoint(Checkpoint *ckp) {
stringstream ss;
ckp->dump(ss);
string str = ss.str();
int msgCount = str.length();
// first send the counts to MASTER
int *msgCounts = NULL, *displ = NULL;
char *recvBuffer = NULL;
int totalCount = 0;
if (isMaster()) {
msgCounts = new int[getNumProcesses()];
displ = new int[getNumProcesses()];
}
MPI_Gather(&msgCount, 1, MPI_INT, msgCounts, 1, MPI_INT, PROC_MASTER, MPI_COMM_WORLD);
// now real contents to MASTER
if (isMaster()) {
for (int i = 0; i < getNumProcesses(); i++) {
displ[i] = totalCount;
totalCount += msgCounts[i];
}
recvBuffer = new char[totalCount+1];
memset(recvBuffer, 0, totalCount+1);
}
char *buf = (char*)str.c_str();
MPI_Gatherv(buf, msgCount, MPI_CHAR, recvBuffer, msgCounts, displ, MPI_CHAR, PROC_MASTER, MPI_COMM_WORLD);
if (isMaster()) {
// now decode the buffer
ss.clear();
ss.str(recvBuffer);
ckp->load(ss);
delete [] recvBuffer;
delete [] displ;
delete [] msgCounts;
}
}
#endif
MPIHelper::~MPIHelper() {
// cleanUpMessages();
}
|