File: baselinereader.cpp

package info (click to toggle)
aoflagger 3.4.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,960 kB
  • sloc: cpp: 83,076; python: 10,187; sh: 260; makefile: 178
file content (190 lines) | stat: -rw-r--r-- 7,367 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#include "baselinereader.h"

#include <set>
#include <stdexcept>

#include <casacore/ms/MeasurementSets/MeasurementSet.h>

#include <casacore/tables/DataMan/IncrStManAccessor.h>
#include <casacore/tables/DataMan/StandardStManAccessor.h>
#include <casacore/tables/DataMan/TiledStManAccessor.h>
#include <casacore/tables/Tables/TableIter.h>
#include <casacore/tables/Tables/ArrayColumn.h>
#include <casacore/tables/Tables/ScalarColumn.h>
#include <casacore/tables/TaQL/ExprNode.h>

#include "../util/progress/dummyprogresslistener.h"
#include "../structures/timefrequencydata.h"

#include "../util/logger.h"

DummyProgressListener BaselineReader::dummy_progress_;

BaselineReader::BaselineReader(const std::string& msFile)
    : _msMetaData(msFile),
      _dataColumnName("DATA"),
      _readData(true),
      _readFlags(true),
      _polarizations() {}

BaselineReader::~BaselineReader() {}

void BaselineReader::initObservationTimes() {
  if (_observationTimes.empty()) {
    Logger::Debug << "Initializing observation times...\n";
    const size_t sequenceCount = _msMetaData.SequenceCount();
    _observationTimes.resize(sequenceCount);
    for (size_t sequenceId = 0; sequenceId != sequenceCount; ++sequenceId) {
      const std::set<double>& times =
          _msMetaData.GetObservationTimesSet(sequenceId);
      unsigned index = 0;
      for (const double t : times) {
        _observationTimes[sequenceId].emplace(t, index);
        _observationTimesVector.push_back(t);
        ++index;
      }
    }
  }
}

void BaselineReader::AddReadRequest(size_t antenna1, size_t antenna2,
                                    size_t spectralWindow, size_t sequenceId) {
  initObservationTimes();

  addReadRequest(antenna1, antenna2, spectralWindow, sequenceId, 0,
                 _observationTimes[sequenceId].size());
}

TimeFrequencyData BaselineReader::GetNextResult(std::vector<class UVW>& uvw) {
  const size_t requestIndex = 0;
  TimeFrequencyData data;
  data = TimeFrequencyData(_polarizations.data(), _polarizations.size(),
                           _results[requestIndex]._realImages.data(),
                           _results[requestIndex]._imaginaryImages.data());
  data.SetIndividualPolarizationMasks(_results[requestIndex]._flags.data());
  uvw = _results[0]._uvw;

  _results.erase(_results.begin() + requestIndex);

  return data;
}

void BaselineReader::initializePolarizations() {
  if (_polarizations.empty()) {
    casacore::MeasurementSet ms(_msMetaData.Path());

    const casacore::MSDataDescription ddTable = ms.dataDescription();
    if (ddTable.nrow() == 0)
      throw std::runtime_error("DataDescription table is empty");
    const casacore::ScalarColumn<int> polIdColumn(
        ddTable, casacore::MSDataDescription::columnName(
                     casacore::MSDataDescription::POLARIZATION_ID));
    const int polarizationId = polIdColumn(0);
    for (size_t row = 0; row != ddTable.nrow(); ++row) {
      if (polIdColumn(row) != polarizationId)
        throw std::runtime_error(
            "This measurement set has different polarizations listed in the "
            "datadescription table. This is non-standard, and AOFlagger cannot "
            "handle it.");
    }

    const casacore::Table polTable = ms.polarization();
    const casacore::ArrayColumn<int> corTypeColumn(polTable, "CORR_TYPE");
    casacore::Array<int> corType = corTypeColumn(polarizationId);
    const casacore::Array<int>::iterator iterend(corType.end());
    for (casacore::Array<int>::iterator iter = corType.begin(); iter != iterend;
         ++iter) {
      const aocommon::PolarizationEnum polarization =
          aocommon::Polarization::AipsIndexToEnum(*iter);
      _polarizations.push_back(polarization);
    }
  }
}

/**
 * Returns an estimate of the size of the data of the MS.
 *
 * One thing that might be problematic, is that this function isn't aware of
 * different spectral windows ("SPW") or different scans. Unlike Dp3, AOFlagger
 * is used also for telescopes like JVLA that don't necessarily store all times
 * together. The readers do support it; the unit of a single contiguous stream
 * of data is called a sequence there. In rare occassions, the timesteps might
 * not divide the measurement set "linearly" in those cases (e.g. the scan may
 * use 10 Spws, whereas the second one uses 1, and both might have the same nr
 * of timesteps -- in which case the second one would take only 1/11th of the
 * obs). This is a bit more rare though, and it is after all only an estimate,
 * so maybe we can live with this until we get a bug report :).
 */
static uint64_t GetMeasurementSetDataSize(casacore::MeasurementSet& ms) {
  const casacore::MSSpectralWindow spwTable = ms.spectralWindow();

  const casacore::ScalarColumn<int> numChanCol(
      spwTable, casacore::MSSpectralWindow::columnName(
                    casacore::MSSpectralWindowEnums::NUM_CHAN));
  const size_t channelCount = numChanCol.get(0);
  if (channelCount == 0) throw std::runtime_error("No channels in set");
  if (ms.nrow() == 0) throw std::runtime_error("Table has no rows (no data)");

  typedef float num_t;
  typedef std::complex<num_t> complex_t;
  const casacore::ScalarColumn<int> ant1Column(
      ms, ms.columnName(casacore::MSMainEnums::ANTENNA1));
  const casacore::ScalarColumn<int> ant2Column(
      ms, ms.columnName(casacore::MSMainEnums::ANTENNA2));
  const casacore::ArrayColumn<complex_t> dataColumn(
      ms, ms.columnName(casacore::MSMainEnums::DATA));

  casacore::IPosition dataShape = dataColumn.shape(0);
  const unsigned polarizationCount = dataShape[0];

  return (uint64_t)polarizationCount * (uint64_t)channelCount *
         (uint64_t)ms.nrow() * (uint64_t)(sizeof(num_t) * 2 + sizeof(bool));
}

uint64_t BaselineReader::MeasurementSetDataSize(const string& filename) {
  casacore::MeasurementSet ms(filename);

  return GetMeasurementSetDataSize(ms);
}

static size_t GetNTimeSteps(casacore::MeasurementSet& ms) {
  size_t result = 0;
  double time = -1.0;
  const casacore::ScalarColumn<double> time_column{
      ms, ms.columnName(casacore::MSMainEnums::TIME)};

  for (size_t i = 0; i != ms.nrow(); ++i)
    if (const double t = time_column(i); t != time) {
      ++result;
      time = t;
    }

  return result;
}

static uint64_t GetMeasurementSetIntervalDataSize(const string& filename,
                                                  size_t start, size_t end) {
  assert(start <= end && "Reqested begin and end aren't a valid range");
  casacore::MeasurementSet ms(filename);

  const uint64_t result = GetMeasurementSetDataSize(ms);
  const size_t time_steps_ms = GetNTimeSteps(ms);
  const size_t time_steps_requested = end - start;

  // When more time steps are requested than available use the available number.
  // Validate time_steps_ms to avoid a division by zero.
  if (time_steps_requested >= time_steps_ms || time_steps_ms == 0)
    return result;
  return result * static_cast<double>(time_steps_requested) / time_steps_ms;
}

uint64_t BaselineReader::MeasurementSetIntervalDataSize(
    const string& filename, std::optional<size_t> start,
    std::optional<size_t> end) {
  if (start) {
    assert(end && "The engagement status of start and end should be the same.");
    return GetMeasurementSetIntervalDataSize(filename, *start, *end);
  }

  return BaselineReader::MeasurementSetDataSize(filename);
}