1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
#ifndef SYNCHRONIZED_MS_H
#define SYNCHRONIZED_MS_H
#include <casacore/ms/MeasurementSets/MeasurementSet.h>
#include <condition_variable>
#include <mutex>
#include <set>
#include <string>
namespace wsclean {
class SynchronizedMS {
public:
SynchronizedMS() {}
SynchronizedMS(const std::string& filename,
const casacore::TableLock::LockOption lock_option =
casacore::TableLock::DefaultLocking)
: _lock(std::make_shared<MSLock>(filename, lock_option)) {}
void Reset() { _lock.reset(); }
casacore::MeasurementSet* operator->() { return &_lock->MS(); }
casacore::MeasurementSet& operator*() { return _lock->MS(); }
casacore::MeasurementSet& MS() { return _lock->MS(); }
const std::string& Filename() const { return _lock->Filename(); }
private:
class MSLock {
public:
MSLock(const std::string& filename,
const casacore::TableLock::LockOption lock_option)
: _filename(filename) {
std::unique_lock<std::mutex> lock(_mutex);
while (_openFiles.count(filename)) _condition.wait(lock);
_openFiles.insert(filename);
lock.unlock();
_ms.reset(new casacore::MeasurementSet(_filename,
casacore::TableLock(lock_option)));
}
MSLock(const MSLock&) = delete;
MSLock(MSLock&& other) noexcept
: _filename(std::move(other._filename)), _ms(std::move(other._ms)) {
// the other must already have access, so the ms can be directly claimed.
// (unless the other was an empty object, in which case the
// copy will also be an empty object)
other._filename = std::string();
}
~MSLock() { release(); }
MSLock& operator=(const MSLock&) = delete;
MSLock& operator=(MSLock&& rhs) noexcept {
release();
_filename = std::move(rhs._filename);
_ms = std::move(rhs._ms);
rhs._filename = std::string();
return *this;
}
casacore::MeasurementSet& MS() { return *_ms; }
const std::string& Filename() const { return _filename; }
private:
void release() {
if (!_filename.empty()) {
_ms.reset();
std::lock_guard<std::mutex> lock(_mutex);
_openFiles.erase(_filename);
_condition.notify_all();
}
}
static std::set<std::string> _openFiles;
static std::condition_variable _condition;
static std::mutex _mutex;
std::string _filename;
std::unique_ptr<casacore::MeasurementSet> _ms;
};
std::shared_ptr<MSLock> _lock;
};
} // namespace wsclean
#endif
|