File: aoflagger.cpp

package info (click to toggle)
aoflagger 2.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 4,232 kB
  • sloc: cpp: 61,805; python: 60; sh: 23; makefile: 8
file content (361 lines) | stat: -rw-r--r-- 11,403 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
#include <iostream>
#include <string>
#include <mutex>

#include <libgen.h>

#include "strategy/actions/foreachmsaction.h"
#include "strategy/actions/strategy.h"

#include "strategy/algorithms/baselineselector.h"
#include "strategy/algorithms/polarizationstatistics.h"

#include "strategy/plots/antennaflagcountplot.h"
#include "strategy/plots/frequencyflagcountplot.h"
#include "strategy/plots/timeflagcountplot.h"

#include "strategy/control/artifactset.h"
#include "strategy/control/strategyreader.h"
#include "strategy/control/defaultstrategy.h"

#include "structures/system.h"

#include "util/logger.h"
#include "util/parameter.h"
#include "util/progresslistener.h"
#include "util/stopwatch.h"
#include "util/numberlist.h"

#include "version.h"

#include <boost/date_time/posix_time/posix_time.hpp>

class ConsoleProgressHandler : public ProgressListener {
	private:
		std::mutex _mutex;
		
	public:
		
		virtual void OnStartTask(const rfiStrategy::Action &action, size_t taskNo, size_t taskCount, const std::string &description, size_t weight) final override
		{
			std::lock_guard<std::mutex> lock(_mutex);
			ProgressListener::OnStartTask(action, taskNo, taskCount, description, weight);
			
			double totalProgress = TotalProgress();
			
			Logger::Progress << round(totalProgress*1000.0)/10.0 << "% : ";
			
			for(size_t i=1;i<Depth();++i)
				Logger::Progress << "+-";
			
			Logger::Progress << description << "...\n";
		}
		
		virtual void OnEndTask(const rfiStrategy::Action &action) final override
		{
			std::lock_guard<std::mutex> lock(_mutex);
			
			ProgressListener::OnEndTask(action);
		}

		virtual void OnProgress(const rfiStrategy::Action &action, size_t i, size_t j) final override
		{
			ProgressListener::OnProgress(action, i, j);
		}

		virtual void OnException(const rfiStrategy::Action &, std::exception &thrownException) final override
		{
			Logger::Error <<
				"An exception occured during execution of the strategy!\n"
				"Your set might not be fully flagged. Exception was:\n"
				<< thrownException.what() << '\n';
		}
};

#define RETURN_SUCCESS                0
#define RETURN_CMDLINE_ERROR         10
#define RETURN_STRATEGY_PARSE_ERROR  20
#define RETURN_UNHANDLED_EXCEPTION   30

void checkRelease()
{
#ifndef NDEBUG
		Logger::Warn
			<< "This version of the AOFlagger has been compiled as DEBUG version! (NDEBUG was not defined)\n"
			<< "For better performance, recompile it as a RELEASE.\n\n";
#endif
}
void generalInfo()
{
	Logger::Info << 
		"AOFlagger " << AOFLAGGER_VERSION_STR << " (" << AOFLAGGER_VERSION_DATE_STR <<
		") command line application\n"
		"This program will execute an RFI strategy as can be created with the RFI gui\n"
		"and executes it on one or several observations.\n\n"
		"Author: André Offringa (offringa@gmail.com)\n\n";
}

int main(int argc, char **argv)
{
	if(argc == 1)
	{
		generalInfo();
		Logger::Error << "Usage: " << argv[0] << " [options] <obs1> [<obs2> [..]]\n"
		"  -v will produce verbose output\n"
		"  -j overrides the number of threads specified in the strategy\n"
		"     (default: one thread for each CPU core)\n"
		"  -strategy <strategy>\n"
		"     specifies a customized strategy\n"
		"  -direct-read\n"
		"     Will perform the slowest IO but will always work.\n"
		"  -indirect-read\n"
		"     Will reorder the measurement set before starting, which is normally faster but requires\n"
		"     free disk space to reorder the data to.\n"
		"  -memory-read\n"
		"     Will read the entire measurement set in memory. This is the fastest, but requires much\n"
		"     memory.\n"
		"  -auto-read-mode\n"
		"     Will select either memory or direct mode based on available memory (default).\n"
		"  -skip-flagged\n"
		"     Will skip an ms if it has already been processed by AOFlagger according to its HISTORY\n"
		"     table.\n"
		"  -uvw\n"
		"     Reads uvw values (some exotic strategies require these)\n"
		"  -column <name>\n"
		"     Specify column to flag\n"
		"  -bands <list>\n"
		"     Comma separated list of (zero-indexed) band ids to process\n"
		"  -fields <list>\n"
		"     Comma separated list of (zero-indexed) field ids to process\n"
		"  -combine-spws\n"
		"     Join all SPWs together in frequency direction before flagging\n"
		"  -bandpass <filename>\n"
		"     Set bandpass correction file for any 'Apply passband' action\n"
		"\n"
		"This tool supports the Casacore measurement set, the SDFITS and Filterbank formats and some more. See\n"
		"the documentation for support of other file types.\n";
		
		checkRelease();
		
		return RETURN_CMDLINE_ERROR;
	}
	
#ifdef HAS_LOFARSTMAN
	register_lofarstman();
#endif // HAS_LOFARSTMAN
	
	Parameter<size_t> threadCount;
	Parameter<BaselineIOMode> readMode;
	Parameter<bool> readUVW;
	Parameter<std::string> strategyFile;
	Parameter<Logger::VerbosityLevel> logVerbosity;
	Parameter<bool> skipFlagged;
	Parameter<std::string> dataColumn;
	Parameter<bool> combineSPWs;
	Parameter<std::string> bandpass;
	std::set<size_t> bands, fields;

	size_t parameterIndex = 1;
	while(parameterIndex < (size_t) argc && argv[parameterIndex][0]=='-')
	{
		std::string flag(argv[parameterIndex]+1);
		
		// If "--" was used, strip another dash
		if(!flag.empty() && flag[0] == '-')
			flag = flag.substr(1);
		
		if(flag=="j" && parameterIndex < (size_t) (argc-1))
		{
			++parameterIndex;
			threadCount = atoi(argv[parameterIndex]);
		}
		else if(flag=="v")
		{
			logVerbosity = Logger::VerboseVerbosity;
		}
		else if(flag == "version")
		{
			Logger::Info << "AOFlagger " << AOFLAGGER_VERSION_STR << " (" << AOFLAGGER_VERSION_DATE_STR << ")\n";
			return 0;
		}
		else if(flag=="direct-read")
		{
			readMode = DirectReadMode;
		}
		else if(flag=="indirect-read")
		{
			readMode = IndirectReadMode;
		}
		else if(flag=="memory-read")
		{
			readMode = MemoryReadMode;
		}
		else if(flag=="auto-read-mode")
		{
			readMode = AutoReadMode;
		}
		else if(flag=="strategy")
		{
			parameterIndex++;
			strategyFile = argv[parameterIndex];
		}
		else if(flag=="skip-flagged")
		{
			skipFlagged = true;
		}
		else if(flag=="uvw")
		{
			readUVW = true;
		}
		else if(flag == "column")
		{
			parameterIndex++;
			dataColumn = std::string(argv[parameterIndex]); 
		}
		else if(flag == "bands")
		{
			++parameterIndex;
			NumberList::ParseIntList(argv[parameterIndex], bands);
		}
		else if(flag == "fields")
		{
			++parameterIndex;
			NumberList::ParseIntList(argv[parameterIndex], fields);
		}
		else if(flag == "combine-spws")
		{
			combineSPWs = true;
		}
		else if(flag == "bandpass")
		{
			++parameterIndex;
			bandpass = std::string(argv[parameterIndex]);
		}
		else
		{
			Logger::Error << "Incorrect usage; parameter \"" << argv[parameterIndex] << "\" not understood.\n";
			return 1;
		}
		++parameterIndex;
	}

	try {
		Logger::SetVerbosity(logVerbosity.Value(Logger::NormalVerbosity));
		generalInfo();
			
		checkRelease();

		if(!threadCount.IsSet())
			threadCount = System::ProcessorCount();
		Logger::Debug << "Number of threads: " << threadCount.Value() << "\n";

		Stopwatch watch(true);

		std::mutex ioMutex;
		
		std::unique_ptr<rfiStrategy::ForEachMSAction> fomAction(new rfiStrategy::ForEachMSAction());
		if(readMode.IsSet())
			fomAction->SetIOMode(readMode);
		if(readUVW.IsSet())
			fomAction->SetReadUVW(readUVW);
		if(dataColumn.IsSet())
			fomAction->SetDataColumnName(dataColumn);
		if(!bands.empty())
			fomAction->Bands() = bands;
		if(!fields.empty())
			fomAction->Fields() = fields;
		if(combineSPWs.IsSet())
			fomAction->SetCombineSPWs(combineSPWs);
		if(bandpass.IsSet())
			fomAction->SetBandpassFilename(bandpass);
		std::stringstream commandLineStr;
		commandLineStr << argv[0];
		for(int i=1;i<argc;++i)
		{
			commandLineStr << " \"" << argv[i] << '\"';
		}
		fomAction->SetCommandLineForHistory(commandLineStr.str());
		if(skipFlagged.IsSet())
			fomAction->SetSkipIfAlreadyProcessed(skipFlagged);
		for(int i=parameterIndex;i<argc;++i)
		{
			Logger::Debug << "Adding '" << argv[i] << "'\n";
			fomAction->Filenames().push_back(argv[i]);
		}
		
		if(strategyFile.IsSet())
		{
			fomAction->SetLoadOptimizedStrategy(false);
			rfiStrategy::StrategyReader reader;
			std::unique_ptr<rfiStrategy::Strategy> subStrategy;
			try {
				Logger::Debug << "Opening strategy file '" << strategyFile.Value() << "'\n";
				subStrategy = reader.CreateStrategyFromFile(strategyFile);
				Logger::Debug << "Strategy parsed succesfully.\n";
			} catch(std::exception &e)
			{
				Logger::Error <<
					"ERROR: Reading strategy file \"" << strategyFile.Value() << "\" failed! This\n"
					"might be caused by a change in the file format of the strategy file after you\n"
					"created the strategy file.\n"
					"Try recreating the file.\n"
					"\nThe thrown exception was:\n" << e.what() << "\n";
				return RETURN_STRATEGY_PARSE_ERROR;
			}
			if(!rfiStrategy::DefaultStrategy::StrategyContainsAction(*subStrategy, rfiStrategy::ForEachBaselineActionType) &&
				!rfiStrategy::DefaultStrategy::StrategyContainsAction(*subStrategy, rfiStrategy::WriteFlagsActionType))
			{
				rfiStrategy::DefaultStrategy::StrategySetup setup =
					rfiStrategy::DefaultStrategy::DetermineSetup(rfiStrategy::DefaultStrategy::GENERIC_TELESCOPE, 0, 0.0, 0.0, 0.0);
				rfiStrategy::DefaultStrategy::EncapsulateSingleStrategy(*fomAction, std::move(subStrategy), setup);
				Logger::Info << "Modified single-baseline strategy so it will execute strategy on all baselines and write flags.\n";
			}
			else {
				fomAction->Add(std::move(subStrategy));
			}
			if(threadCount.IsSet())
				rfiStrategy::Strategy::SetThreadCount(*fomAction, threadCount);
		}
		else {
			fomAction->SetLoadOptimizedStrategy(true);
			fomAction->Add(std::unique_ptr<rfiStrategy::Strategy>(new rfiStrategy::Strategy())); // This helps the progress reader to determine progress
			if(threadCount.IsSet())
				fomAction->SetLoadStrategyThreadCount(threadCount);
		}
		
		rfiStrategy::Strategy overallStrategy;
		overallStrategy.Add(std::move(fomAction));

		rfiStrategy::ArtifactSet artifacts(&ioMutex);
		artifacts.SetAntennaFlagCountPlot(std::unique_ptr<AntennaFlagCountPlot>(new AntennaFlagCountPlot()));
		artifacts.SetFrequencyFlagCountPlot(std::unique_ptr<FrequencyFlagCountPlot>(new FrequencyFlagCountPlot()));
		artifacts.SetTimeFlagCountPlot(std::unique_ptr<TimeFlagCountPlot>(new TimeFlagCountPlot()));
		artifacts.SetPolarizationStatistics(std::unique_ptr<PolarizationStatistics>(new PolarizationStatistics()));
		artifacts.SetBaselineSelectionInfo(std::unique_ptr<rfiStrategy::BaselineSelector>(new rfiStrategy::BaselineSelector()));
		
		ConsoleProgressHandler progress;

		Logger::Info << "Starting strategy on " << to_simple_string(boost::posix_time::microsec_clock::local_time()) << '\n';
		
		overallStrategy.InitializeAll();
		overallStrategy.StartPerformThread(artifacts, progress);
		std::unique_ptr<rfiStrategy::ArtifactSet> set = overallStrategy.JoinThread();
		overallStrategy.FinishAll();

		set->AntennaFlagCountPlot().Report();
		set->FrequencyFlagCountPlot().Report();
		set->PolarizationStatistics().Report();

		set.reset();

		Logger::Debug << "Time: " << watch.ToString() << "\n";
		
		return RETURN_SUCCESS;
	} catch(std::exception& exception)
	{
		std::cerr
			<< "An unhandled exception occured: " << exception.what() << '\n'
			<< "If you think this is a bug, please contact offringa@gmail.com\n";
		return RETURN_UNHANDLED_EXCEPTION;
	}
}