1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
#include "griddingtaskmanager.h"
#include <numeric>
#include <mutex>
#include <vector>
#include "griddingtask.h"
#include "griddingresult.h"
#include "mpischeduler.h"
#include "threadedscheduler.h"
#include "../gridding/h5solutiondata.h"
#include "../gridding/msgriddermanager.h"
#include "../main/settings.h"
#include "../structures/resources.h"
namespace wsclean {
GriddingTaskManager::GriddingTaskManager(const Settings& settings)
: settings_(settings),
solution_data_(settings),
writer_lock_manager_(this) {}
GriddingTaskManager::~GriddingTaskManager() = default;
std::unique_ptr<GriddingTaskManager> GriddingTaskManager::Make(
const Settings& settings) {
if (settings.UseMpi()) {
#ifdef HAVE_MPI
return std::make_unique<MPIScheduler>(settings);
#else
throw std::runtime_error("MPI not available");
#endif
} else if (settings.parallelGridding > 1) {
return std::make_unique<ThreadedScheduler>(settings);
} else {
return std::make_unique<GriddingTaskManager>(settings);
}
}
Resources GriddingTaskManager::GetResources() const {
return Resources(
settings_.threadCount,
GetAvailableMemory(settings_.memFraction, settings_.absMemLimit));
}
void GriddingTaskManager::Run(
GriddingTask&& task, std::function<void(GriddingResult&)> finishCallback) {
std::vector<size_t> facet_indices(task.facets.size());
std::iota(facet_indices.begin(), facet_indices.end(), 0);
GriddingResult result;
result.facets.resize(task.facets.size());
std::mutex result_mutex;
RunDirect(task, facet_indices, GetResources(), result, result_mutex, {});
finishCallback(result);
}
void GriddingTaskManager::RunDirect(
GriddingTask& task, const std::vector<size_t>& facet_indices,
const Resources& resources, GriddingResult& result,
std::mutex& result_mutex,
std::function<void(std::unique_ptr<MSGridderManagerScheduler>&)>
signal_last_work_has_started,
std::unique_ptr<MSGridderManagerScheduler> scheduler) {
assert(!facet_indices.empty());
assert(result.facets.size() == task.facets.size());
assert(!task.msList.empty());
bool batch_task =
task.operation == GriddingTask::Invert && settings_.shared_facet_reads;
batch_task |=
task.operation == GriddingTask::Predict && settings_.shared_facet_writes;
const size_t n_threads = resources.NCpus();
// Select which scheduler to use.
// If we have been explicitely passed one then use that.
// Alternatively try to re-use one from the cache.
// As a last resort allocate a new one.
std::unique_ptr<MSGridderManagerScheduler> selected_scheduler = nullptr;
if (scheduler) {
selected_scheduler = std::move(scheduler);
} else if (batch_task) {
{
std::lock_guard<std::mutex> lock(scheduler_creation_mutex_);
if (!scheduler_cache_[n_threads].empty()) {
selected_scheduler = std::move(scheduler_cache_[n_threads].front());
scheduler_cache_[n_threads].pop_front();
}
}
if (!selected_scheduler) {
selected_scheduler =
std::make_unique<MSGridderManagerScheduler>(n_threads);
}
}
MSGridderManager manager(settings_, solution_data_, selected_scheduler.get());
manager.InitializeMS(task);
manager.InitializeGridders(task, facet_indices, resources, result.facets,
writer_lock_manager_);
if (task.operation == GriddingTask::Invert) {
if (settings_.shared_facet_reads) {
manager.BatchInvert([&]() {
// NB! The signal can take ownership of the scheduler.
std::lock_guard<std::mutex> lock(scheduler_creation_mutex_);
signal_last_work_has_started(selected_scheduler);
});
} else {
manager.Invert();
}
} else {
if (settings_.shared_facet_writes) {
manager.BatchPredict([&]() {
// NB! The signal can take ownership of the scheduler.
std::lock_guard<std::mutex> lock(scheduler_creation_mutex_);
signal_last_work_has_started(selected_scheduler);
});
} else {
manager.Predict();
}
}
const bool store_common_info = (facet_indices.front() == 0);
if (store_common_info) {
result.unique_id = task.unique_id;
}
manager.ProcessResults(result_mutex, result, store_common_info);
// We are done with this scheduler.
// Place it in the cache for later re-use.
if (selected_scheduler) {
std::lock_guard<std::mutex> lock(scheduler_creation_mutex_);
scheduler_cache_[n_threads].push_back(std::move(selected_scheduler));
}
}
} // namespace wsclean
|