1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
#ifndef AOREMOTE__PROCESS_COMMANDER_H
#define AOREMOTE__PROCESS_COMMANDER_H
#include <map>
#include <string>
#include <deque>
#include <vector>
#include <memory>
#include <mutex>
#include "clusteredobservation.h"
#include "nodecommandmap.h"
#include "remoteprocess.h"
#include "server.h"
#include "../structures/antennainfo.h"
class StatisticsCollection;
namespace aoRemote {
class ObservationTimerange;
class ProcessCommander
{
public:
explicit ProcessCommander(const ClusteredObservation &observation);
~ProcessCommander();
void Run(bool finishConnections = true);
static Hostname GetHostName();
const StatisticsCollection &Statistics() const { return *_statisticsCollection; }
const HistogramCollection &Histograms() const { return *_histogramCollection; }
size_t PolarizationCount() const { return _polarizationCount; }
const std::vector<AntennaInfo> &Antennas() const { return _antennas; }
const std::vector<BandInfo> &Bands() const { return _bands; }
const ObservationTimerange &ObsTimerange() const { return *_observationTimerange; }
size_t RowsTotal() const { return _rowsTotal; }
const std::vector<std::string> &Errors() const { return _errors; }
std::string ErrorString() const;
void CheckErrors() const;
void PushReadQualityTablesTask(StatisticsCollection *dest, HistogramCollection *destHistogram, bool correctHistograms = false)
{
_correctHistograms = correctHistograms;
_tasks.push_back(ReadQualityTablesTask);
_statisticsCollection = dest;
_histogramCollection = destHistogram;
}
void PushReadAntennaTablesTask() { _tasks.push_back(ReadAntennaTablesTask); }
void PushReadBandTablesTask()
{
_tasks.push_back(ReadBandTablesTask);
_bands.resize(_observation.Size());
}
/**
* @param rowBuffer should have #NODES elements, each which is an array of #ROWCOUNT rows.
* It is not expected to hold the data yet; it is a parameter so that repeated calls do not have
* to allocate that memory over and over.
*/
void PushReadDataRowsTask(class ObservationTimerange &timerange, size_t rowStart, size_t rowCount, MSRowDataExt **rowBuffer)
{
_tasks.push_back(ReadDataRowsTask);
_observationTimerange = &timerange;
_readRowBuffer = rowBuffer;
_rowStart = rowStart;
_rowCount = rowCount;
_rowsTotal = 0;
}
/**
* @param rowBuffer should have #NODES elements, each which is an array of timerange.#ROW rows.
* It is not expected to hold the data yet; it is a parameter so that repeated calls do not have
* to allocate that memory over and over.
*/
void PushWriteDataRowsTask(class ObservationTimerange &timerange, MSRowDataExt **rowBuffer)
{
_tasks.push_back(WriteDataRowsTask);
_observationTimerange = &timerange;
_writeRowBuffer = rowBuffer;
_rowsTotal = 0;
}
const ClusteredObservation &Observation() const { return _observation; }
private:
enum Task {
NoTask,
ReadQualityTablesTask,
ReadAntennaTablesTask,
ReadBandTablesTask,
ReadDataRowsTask,
WriteDataRowsTask
};
void endIdleConnections();
void continueReadQualityTablesTask(ServerConnectionPtr serverConnection);
void continueReadAntennaTablesTask(ServerConnectionPtr serverConnection);
void continueReadBandTablesTask(ServerConnectionPtr serverConnection);
void continueReadDataRowsTask(ServerConnectionPtr serverConnection);
void continueWriteDataRowsTask(ServerConnectionPtr serverConnection);
void onConnectionCreated(ServerConnectionPtr serverConnection, bool &acceptConnection);
void onConnectionAwaitingCommand(ServerConnectionPtr serverConnection);
void onConnectionFinishReadQualityTables(ServerConnectionPtr serverConnection, StatisticsCollection &statisticsCollection, HistogramCollection &histogramCollection);
void onConnectionFinishReadAntennaTables(ServerConnectionPtr serverConnection, std::shared_ptr<std::vector<AntennaInfo> > antennas, size_t polarizationCount);
void onConnectionFinishReadBandTable(ServerConnectionPtr serverConnection, BandInfo &band);
void onConnectionFinishReadDataRows(ServerConnectionPtr serverConnection, MSRowDataExt *rowData, size_t rowCount);
void onConnectionFinishWriteDataRows(ServerConnectionPtr serverConnection);
void onError(ServerConnectionPtr connection, const std::string &error);
void onProcessFinished(RemoteProcess &process, bool error, int status);
Server _server;
typedef std::vector<ServerConnectionPtr> ConnectionVector;
ConnectionVector _idleConnections;
std::vector<RemoteProcess *> _processes;
StatisticsCollection *_statisticsCollection;
HistogramCollection *_histogramCollection;
bool _correctHistograms;
size_t _polarizationCount;
std::vector<AntennaInfo> _antennas;
std::vector<BandInfo> _bands;
class ObservationTimerange *_observationTimerange;
MSRowDataExt **_readRowBuffer;
MSRowDataExt **_writeRowBuffer;
size_t _rowStart, _rowCount, _rowsTotal;
const ClusteredObservation &_observation;
NodeCommandMap _nodeCommands;
bool _finishConnections;
std::vector<std::string> _errors;
std::deque<enum Task> _tasks;
/**
* Because the processes have separate threads that can send signals from
* their thread, locking is required for accessing data that might be
* accessed by the processes.
*/
std::mutex _mutex;
Task currentTask() const {
if(!_tasks.empty()) return _tasks.front();
else return NoTask;
}
void onCurrentTaskFinished() {
_tasks.pop_front();
if(currentTask() == NoTask)
_server.Stop();
}
void handleIdleConnection(ServerConnectionPtr serverConnection) {
if(_finishConnections)
serverConnection->StopClient();
else
_idleConnections.push_back(serverConnection);
}
};
}
#endif
|