File: griddingtaskmanager.cpp

package info (click to toggle)
wsclean 3.6-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 16,296 kB
  • sloc: cpp: 129,246; python: 22,066; sh: 360; ansic: 230; makefile: 185
file content (106 lines) | stat: -rw-r--r-- 3,300 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include "griddingtaskmanager.h"

#include <numeric>
#include <mutex>
#include <vector>

#include "griddingtask.h"
#include "griddingresult.h"
#include "mpischeduler.h"
#include "threadedscheduler.h"

#include "../gridding/h5solutiondata.h"
#include "../gridding/msgriddermanager.h"
#include "../main/settings.h"
#include "../structures/resources.h"

namespace wsclean {

GriddingTaskManager::GriddingTaskManager(const Settings& settings)
    : settings_(settings),
      solution_data_(settings),
      writer_lock_manager_(this) {}

GriddingTaskManager::~GriddingTaskManager() {}

std::unique_ptr<GriddingTaskManager> GriddingTaskManager::Make(
    const Settings& settings) {
  if (settings.UseMpi()) {
#ifdef HAVE_MPI
    return std::make_unique<MPIScheduler>(settings);
#else
    throw std::runtime_error("MPI not available");
#endif
  } else if (settings.parallelGridding > 1) {
    return std::make_unique<ThreadedScheduler>(settings);
  } else {
    return std::make_unique<GriddingTaskManager>(settings);
  }
}

Resources GriddingTaskManager::GetResources() const {
  return Resources(
      settings_.threadCount,
      GetAvailableMemory(settings_.memFraction, settings_.absMemLimit));
}

void GriddingTaskManager::Run(
    GriddingTask&& task, std::function<void(GriddingResult&)> finishCallback) {
  std::vector<size_t> facet_indices(task.facets.size());
  std::iota(facet_indices.begin(), facet_indices.end(), 0);

  GriddingResult result;
  result.facets.resize(task.facets.size());
  std::mutex result_mutex;

  RunDirect(task, facet_indices, GetResources(), result, result_mutex);

  finishCallback(result);
}

void GriddingTaskManager::RunDirect(GriddingTask& task,
                                    const std::vector<size_t>& facet_indices,
                                    const Resources& resources,
                                    GriddingResult& result,
                                    std::mutex& result_mutex) {
  assert(!facet_indices.empty());
  assert(result.facets.size() == task.facets.size());

  // Wait tasks occupy a thread from the pool by waiting on
  // lock_excess_scheduler_tasks_ which will freeze the tasks thread until
  // lock_excess_scheduler_tasks_.SignalCompletion() is called.
  if (task.operation == GriddingTask::Wait) {
    task.lock_excess_scheduler_tasks_->WaitForCompletion();
    return;
  }

  MSGridderManager manager(settings_, solution_data_);
  manager.InitializeMS(task);
  manager.InitializeGridders(task, facet_indices, resources, result.facets,
                             writer_lock_manager_);
  if (task.operation == GriddingTask::Invert) {
    if (settings_.shared_facet_reads) {
      manager.SortFacetTasks();
      manager.BatchInvert(task.num_parallel_gridders_);
    } else {
      manager.Invert();
    }
  } else {
    manager.Predict();
  }
  const bool store_common_info = (facet_indices.front() == 0);
  if (store_common_info) {
    result.unique_id = task.unique_id;
  }
  manager.ProcessResults(result_mutex, result, store_common_info);

  if (GetSettings().shared_facet_reads) {
    // Allow wait tasks to resume
    if (task.lock_excess_scheduler_tasks_) {
      task.lock_excess_scheduler_tasks_->SignalCompletion();
      task.lock_excess_scheduler_tasks_.reset();
    }
  }
}

}  // namespace wsclean