1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
#include "worker.h"
#include <mpi.h>
#include <aocommon/io/serialistream.h>
#include <aocommon/io/serialostream.h>
#include <aocommon/logger.h>
#include "../scheduling/griddingtask.h"
#include "mpibig.h"
#include "taskmessage.h"
namespace wsclean {
namespace {
constexpr int kMainNode = 0;
constexpr int kTag = 0;
} // namespace
void Worker::Run() {
TaskMessage message;
const size_t maximum_message_size =
scheduler_.GetSettings().maxMpiMessageSize;
do {
MPI_Status status;
aocommon::UVector<unsigned char> buffer(TaskMessage::kSerializedSize);
MPI_Recv(buffer.data(), TaskMessage::kSerializedSize, MPI_BYTE, kMainNode,
kTag, MPI_COMM_WORLD, &status);
aocommon::SerialIStream stream(std::move(buffer));
message.Unserialize(stream);
switch (message.type) {
case TaskMessage::Type::kStart:
scheduler_.Start(message.n_writer_groups);
break;
case TaskMessage::Type::kFinish:
// The do..while loop will exit, too.
break;
case TaskMessage::Type::kGriddingRequest: {
buffer.resize(message.body_size);
MPI_Recv_Big(buffer.data(), message.body_size, kMainNode, kTag,
MPI_COMM_WORLD, &status, maximum_message_size);
aocommon::SerialIStream stream(std::move(buffer));
stream.UInt64(); // skip the nr of packages
GriddingTask task;
task.Unserialize(stream);
scheduler_.Run(std::move(task), [](GriddingResult&) {});
break;
}
default:
aocommon::Logger::Warn
<< "wsclean-mp MPI worker received an unknown message type!\n";
break;
}
} while (message.type != TaskMessage::Type::kFinish);
aocommon::Logger::Info << "Worker node " << scheduler_.Rank()
<< " received exit message.\n";
}
} // namespace wsclean
|