1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
#include "worker.h"
#include "taskmessage.h"
#include "../main/commandline.h"
#include "../main/wsclean.h"
#include <aocommon/checkblas.h>
#include <aocommon/logger.h>
#include <exception>
#include <iostream>
#include <mpi.h>
using aocommon::Logger;
using wsclean::CommandLine;
using wsclean::Settings;
using wsclean::TaskMessage;
using wsclean::Worker;
using wsclean::WSClean;
namespace {
void SetMpiSettings(Settings& settings, size_t n_nodes) {
settings.nMpiNodes = n_nodes;
if (settings.channelToNode.empty()) {
// Create a default channel-to-node mapping.
settings.channelToNode.reserve(settings.channelsOut);
if (!settings.masterDoesWork) --n_nodes;
for (size_t channelIndex = 0; channelIndex < settings.channelsOut;
++channelIndex) {
// Use a round-robin distribution, since later channels have higher
// frequencies and gridding is therefore more expensive.
size_t node_index = channelIndex % n_nodes;
if (!settings.masterDoesWork) ++node_index;
settings.channelToNode.push_back(node_index);
}
}
}
} // namespace
int main(int argc, char* argv[]) {
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
if (provided != MPI_THREAD_MULTIPLE) {
std::cout << "This MPI implementation does not support multiple threads.\n";
MPI_Abort(MPI_COMM_WORLD, 1);
}
int result = 0;
WSClean wsclean;
int world_size, rank;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
bool main = (rank == 0);
char hostname[256];
gethostname(hostname, sizeof(hostname));
std::cout << "Node " << rank << ", PID " << getpid() << " on " << hostname
<< "\n";
// During parsing of parameters, we don't want all processes to report
// bad parameters. This variable is used to keep track if full errors
// should be reported
bool shortException = false;
try {
bool parseResult = false;
shortException = !main;
parseResult = CommandLine::ParseWithoutValidation(
wsclean, argc, const_cast<const char**>(argv), !main);
shortException = !main && !Logger::IsVerbose();
check_openblas_multithreading();
if (parseResult) {
Settings& settings = wsclean.GetSettings();
SetMpiSettings(settings, world_size);
CommandLine::Validate(wsclean);
shortException = false;
if (main) {
CommandLine::Run(wsclean);
const TaskMessage message(TaskMessage::Type::kFinish, 0);
aocommon::SerialOStream msgStream;
message.Serialize(msgStream);
for (int i = 1; i != world_size; ++i) {
MPI_Send(msgStream.data(), msgStream.size(), MPI_BYTE, i, 0,
MPI_COMM_WORLD);
}
} else {
Worker worker(settings);
worker.Run();
}
}
Logger::Error << "Process " << rank << " finished.\n";
} catch (std::exception& e) {
if (shortException)
Logger::Error << "Process " << rank
<< " stopped because of an exception.\n";
else {
Logger::Error << "+ + + + + + + + + + + + + + + + + + +\n"
<< "+ An exception occured in process " << rank << ":\n";
std::istringstream iss(e.what());
for (std::string line; std::getline(iss, line);) {
Logger::Error << "+ >>> " << line << "\n";
}
Logger::Error << "+ + + + + + + + + + + + + + + + + + +\n";
}
result = -1;
}
MPI_Finalize();
return result;
}
|