File: griddingtaskmanager.h

package info (click to toggle)
wsclean 3.7-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,968 kB
  • sloc: cpp: 85,742; python: 3,526; sh: 245; makefile: 21
file content (129 lines) | stat: -rw-r--r-- 4,214 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#ifndef GRIDDING_TASK_MANAGER_H_
#define GRIDDING_TASK_MANAGER_H_

#include <mutex>
#include <vector>

#include "griddingtask.h"
#include "griddingresult.h"

#include "../gridding/h5solutiondata.h"
#include "../main/settings.h"
#include "../structures/resources.h"

namespace wsclean {

class MSGridderManagerScheduler;

class GriddingTaskManager {
 public:
  class WriterLock {
   public:
    virtual ~WriterLock() = default;
  };

 public:
  explicit GriddingTaskManager(const Settings& settings);

  virtual ~GriddingTaskManager();

  const Settings& GetSettings() const { return settings_; }

  void SetWriterLockManager(GriddingTaskManager& manager) {
    writer_lock_manager_ = &manager;
  }

  /**
   * Initialize writer groups. Call this function before scheduling Predict
   * tasks in order to initialize the writer locks.
   *
   * @param nWriterGroups The number of writer groups.
   */
  virtual void Start([[maybe_unused]] size_t nWriterGroups) {}

  /**
   * Obtains a lock for the given @p writer_group_index.
   * The default implementation returns a dummy lock: Since it runs all tasks
   * sequentially, locking is not needed.
   * @return A lock object. Destroying the object releases the lock.
   */
  virtual std::unique_ptr<WriterLock> GetLock(
      [[maybe_unused]] size_t writer_group_index) {
    return nullptr;
  }

  /**
   * Add the given task to the queue of tasks to be run. After finishing
   * the task, the callback is called with the results. The callback will
   * always run in the thread of the caller.
   * Depending on the type of gridding task manager, this call might block.
   *
   * This implementation runs the task directly and blocks until done.
   */
  virtual void Run(GriddingTask&& task,
                   std::function<void(GriddingResult&)> finishCallback);

  /**
   * Block until all tasks have finished.
   */
  virtual void Finish(){};

  /**
   * Make the gridding task manager according to the settings.
   */
  static std::unique_ptr<GriddingTaskManager> Make(const Settings& settings);

 protected:
  Resources GetResources() const;

  /**
   * Run the provided task with the specified resources.
   * @param task A possibly compound gridding task.
   *        RunDirect() moves large values for the given facets out of the task.
   * @param facet_indices A list with the indices of the facets which should be
   *        gridded. The sequential GriddingTaskManager supplies all facets.
   *        The parallel ThreadedScheduler supplies a single facet and does
   *        multiple RunDirect calls for different facets in parallel.
   * @param resources Resources for creating the gridder.
   * @param result [out] Storage for gridding result.
   *        result.facets should have an entry for each facet.
   *        RunDirect() only updates the result for the given facet indices.
   *        When the first facet index is 0, it also updates the generic
   *        result part, which is equal for all facets.
   * @param result_mutex Protects concurrent accesses to result fields that
   *        are updated for each facet.
   */
  void RunDirect(
      GriddingTask& task, const std::vector<size_t>& facet_indices,
      const Resources& resources, GriddingResult& result,
      std::mutex& result_mutex,
      std::function<void(std::unique_ptr<MSGridderManagerScheduler>&)>
          signal_last_work_has_started,
      std::unique_ptr<MSGridderManagerScheduler> scheduler = nullptr);

 private:
  const Settings& settings_;

  H5SolutionData solution_data_;

  /**
   * Writer lock manager for the scheduler.
   * By default, it equals the 'this' pointer.
   * When the GriddingTaskManager is used within an MPIScheduler,
   * it may point to the MPIScheduler.
   */
  GriddingTaskManager* writer_lock_manager_;

  /**
   * Instead of deleting schedulers after use keep a reusable cache. That way we
   * can re-use them across the entire program run (if doing e.g. multiple major
   * cycles) Rather then repeatedly creating/destroying threading resources.
   */
  std::mutex scheduler_creation_mutex_;
  std::map<size_t, std::deque<std::unique_ptr<MSGridderManagerScheduler>>>
      scheduler_cache_;
};

}  // namespace wsclean

#endif