File: worker.cpp

package info (click to toggle)
wsclean 3.6-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 16,296 kB
  • sloc: cpp: 129,246; python: 22,066; sh: 360; ansic: 230; makefile: 185
file content (68 lines) | stat: -rw-r--r-- 1,867 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include "worker.h"

#include <mpi.h>

#include <aocommon/io/serialistream.h>
#include <aocommon/io/serialostream.h>
#include <aocommon/logger.h>

#include "../scheduling/griddingtask.h"

#include "mpibig.h"
#include "taskmessage.h"

namespace wsclean {

namespace {
constexpr int kMainNode = 0;
constexpr int kTag = 0;
}  // namespace

void Worker::Run() {
  TaskMessage message;
  const size_t maximum_message_size =
      scheduler_.GetSettings().maxMpiMessageSize;

  do {
    MPI_Status status;

    aocommon::UVector<unsigned char> buffer(TaskMessage::kSerializedSize);
    MPI_Recv(buffer.data(), TaskMessage::kSerializedSize, MPI_BYTE, kMainNode,
             kTag, MPI_COMM_WORLD, &status);
    aocommon::SerialIStream stream(std::move(buffer));
    message.Unserialize(stream);

    switch (message.type) {
      case TaskMessage::Type::kStart:
        scheduler_.Start(message.n_writer_groups);
        break;

      case TaskMessage::Type::kFinish:
        // The do..while loop will exit, too.
        break;

      case TaskMessage::Type::kGriddingRequest: {
        buffer.resize(message.body_size);
        MPI_Recv_Big(buffer.data(), message.body_size, kMainNode, kTag,
                     MPI_COMM_WORLD, &status, maximum_message_size);
        aocommon::SerialIStream stream(std::move(buffer));
        stream.UInt64();  // skip the nr of packages

        GriddingTask task;
        task.Unserialize(stream);
        scheduler_.Run(std::move(task), [](GriddingResult&) {});
        break;
      }

      default:
        aocommon::Logger::Warn
            << "wsclean-mp MPI worker received an unknown message type!\n";
        break;
    }

  } while (message.type != TaskMessage::Type::kFinish);
  aocommon::Logger::Info << "Worker node " << scheduler_.Rank()
                         << " received exit message.\n";
}

}  // namespace wsclean