1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
#include "griddingtaskmanager.h"
#include <numeric>
#include <mutex>
#include <vector>
#include "griddingtask.h"
#include "griddingresult.h"
#include "mpischeduler.h"
#include "threadedscheduler.h"
#include "../gridding/h5solutiondata.h"
#include "../gridding/msgriddermanager.h"
#include "../main/settings.h"
#include "../structures/resources.h"
namespace wsclean {
GriddingTaskManager::GriddingTaskManager(const Settings& settings)
: settings_(settings),
solution_data_(settings),
writer_lock_manager_(this) {}
GriddingTaskManager::~GriddingTaskManager() {}
std::unique_ptr<GriddingTaskManager> GriddingTaskManager::Make(
const Settings& settings) {
if (settings.UseMpi()) {
#ifdef HAVE_MPI
return std::make_unique<MPIScheduler>(settings);
#else
throw std::runtime_error("MPI not available");
#endif
} else if (settings.parallelGridding > 1) {
return std::make_unique<ThreadedScheduler>(settings);
} else {
return std::make_unique<GriddingTaskManager>(settings);
}
}
Resources GriddingTaskManager::GetResources() const {
return Resources(
settings_.threadCount,
GetAvailableMemory(settings_.memFraction, settings_.absMemLimit));
}
void GriddingTaskManager::Run(
GriddingTask&& task, std::function<void(GriddingResult&)> finishCallback) {
std::vector<size_t> facet_indices(task.facets.size());
std::iota(facet_indices.begin(), facet_indices.end(), 0);
GriddingResult result;
result.facets.resize(task.facets.size());
std::mutex result_mutex;
RunDirect(task, facet_indices, GetResources(), result, result_mutex);
finishCallback(result);
}
void GriddingTaskManager::RunDirect(GriddingTask& task,
const std::vector<size_t>& facet_indices,
const Resources& resources,
GriddingResult& result,
std::mutex& result_mutex) {
assert(!facet_indices.empty());
assert(result.facets.size() == task.facets.size());
// Wait tasks occupy a thread from the pool by waiting on
// lock_excess_scheduler_tasks_ which will freeze the tasks thread until
// lock_excess_scheduler_tasks_.SignalCompletion() is called.
if (task.operation == GriddingTask::Wait) {
task.lock_excess_scheduler_tasks_->WaitForCompletion();
return;
}
MSGridderManager manager(settings_, solution_data_);
manager.InitializeMS(task);
manager.InitializeGridders(task, facet_indices, resources, result.facets,
writer_lock_manager_);
if (task.operation == GriddingTask::Invert) {
if (settings_.shared_facet_reads) {
manager.SortFacetTasks();
manager.BatchInvert(task.num_parallel_gridders_);
} else {
manager.Invert();
}
} else {
manager.Predict();
}
const bool store_common_info = (facet_indices.front() == 0);
if (store_common_info) {
result.unique_id = task.unique_id;
}
manager.ProcessResults(result_mutex, result, store_common_info);
if (GetSettings().shared_facet_reads) {
// Allow wait tasks to resume
if (task.lock_excess_scheduler_tasks_) {
task.lock_excess_scheduler_tasks_->SignalCompletion();
task.lock_excess_scheduler_tasks_.reset();
}
}
}
} // namespace wsclean
|