File: wsclean-mp.cpp

package info (click to toggle)
wsclean 3.6-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 16,296 kB
  • sloc: cpp: 129,246; python: 22,066; sh: 360; ansic: 230; makefile: 185
file content (112 lines) | stat: -rw-r--r-- 3,479 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include "worker.h"

#include "taskmessage.h"

#include "../main/commandline.h"
#include "../main/wsclean.h"

#include <aocommon/checkblas.h>
#include <aocommon/logger.h>

#include <exception>
#include <iostream>

#include <mpi.h>

using aocommon::Logger;

using wsclean::CommandLine;
using wsclean::Settings;
using wsclean::TaskMessage;
using wsclean::Worker;
using wsclean::WSClean;

namespace {
void SetMpiSettings(Settings& settings, size_t n_nodes) {
  settings.nMpiNodes = n_nodes;

  if (settings.channelToNode.empty()) {
    // Create a default channel-to-node mapping.
    settings.channelToNode.reserve(settings.channelsOut);
    if (!settings.masterDoesWork) --n_nodes;
    for (size_t channelIndex = 0; channelIndex < settings.channelsOut;
         ++channelIndex) {
      // Use a round-robin distribution, since later channels have higher
      // frequencies and gridding is therefore more expensive.
      size_t node_index = channelIndex % n_nodes;
      if (!settings.masterDoesWork) ++node_index;
      settings.channelToNode.push_back(node_index);
    }
  }
}
}  // namespace

int main(int argc, char* argv[]) {
  int provided;
  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
  if (provided != MPI_THREAD_MULTIPLE) {
    std::cout << "This MPI implementation does not support multiple threads.\n";
    MPI_Abort(MPI_COMM_WORLD, 1);
  }

  int result = 0;
  WSClean wsclean;
  int world_size, rank;
  MPI_Comm_size(MPI_COMM_WORLD, &world_size);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  bool main = (rank == 0);

  char hostname[256];
  gethostname(hostname, sizeof(hostname));
  std::cout << "Node " << rank << ", PID " << getpid() << " on " << hostname
            << "\n";

  // During parsing of parameters, we don't want all processes to report
  // bad parameters. This variable is used to keep track if full errors
  // should be reported
  bool shortException = false;
  try {
    bool parseResult = false;
    shortException = !main;
    parseResult = CommandLine::ParseWithoutValidation(
        wsclean, argc, const_cast<const char**>(argv), !main);
    shortException = !main && !Logger::IsVerbose();
    check_openblas_multithreading();
    if (parseResult) {
      Settings& settings = wsclean.GetSettings();
      SetMpiSettings(settings, world_size);
      CommandLine::Validate(wsclean);
      shortException = false;
      if (main) {
        CommandLine::Run(wsclean);
        const TaskMessage message(TaskMessage::Type::kFinish, 0);
        aocommon::SerialOStream msgStream;
        message.Serialize(msgStream);
        for (int i = 1; i != world_size; ++i) {
          MPI_Send(msgStream.data(), msgStream.size(), MPI_BYTE, i, 0,
                   MPI_COMM_WORLD);
        }
      } else {
        Worker worker(settings);
        worker.Run();
      }
    }
    Logger::Error << "Process " << rank << " finished.\n";
  } catch (std::exception& e) {
    if (shortException)
      Logger::Error << "Process " << rank
                    << " stopped because of an exception.\n";
    else {
      Logger::Error << "+ + + + + + + + + + + + + + + + + + +\n"
                    << "+ An exception occured in process " << rank << ":\n";
      std::istringstream iss(e.what());
      for (std::string line; std::getline(iss, line);) {
        Logger::Error << "+ >>> " << line << "\n";
      }
      Logger::Error << "+ + + + + + + + + + + + + + + + + + +\n";
    }
    result = -1;
  }
  MPI_Finalize();
  return result;
}