1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
#include "writethread.h"
#include "../imagesets/multibandmsimageset.h"
#include "../util/logger.h"
WriteThread::WriteThread(imagesets::ImageSet& imageSet, size_t calcThreadCount,
std::mutex* ioMutex)
: _ioMutex(ioMutex),
_isWriteFinishing(false),
_maxWriteBufferItems(calcThreadCount * 5),
_minWriteBufferItemsForWriting(calcThreadCount * 4) {
FlushThread flushFunction;
flushFunction._parent = this;
if (dynamic_cast<imagesets::MultiBandMsImageSet*>(&imageSet)) {
// TODO Would this method also be safe for other writers?
_flusher.reset(new std::thread(flushFunction, &imageSet));
} else {
std::unique_lock<std::mutex> iolock(*_ioMutex);
std::unique_ptr<imagesets::ImageSet> localImageSet = imageSet.Clone();
iolock.unlock();
_flusher.reset(new std::thread(flushFunction, std::move(localImageSet)));
}
}
WriteThread::~WriteThread() {
std::unique_lock<std::mutex> lock(_writeMutex);
_isWriteFinishing = true;
_writeBufferChange.notify_all();
lock.unlock();
Logger::Debug << "Finishing the flusher thread...\n";
_flusher->join();
}
void WriteThread::SaveFlags(const TimeFrequencyData& data,
imagesets::ImageSetIndex& imageSetIndex) {
std::vector<Mask2DCPtr> masks;
if (data.MaskCount() <= 1)
masks.emplace_back(data.GetSingleMask());
else
for (size_t i = 0; i < data.MaskCount(); ++i) {
masks.emplace_back(data.GetMask(i));
}
const BufferItem newItem(masks, imageSetIndex);
pushInWriteBuffer(newItem);
}
void WriteThread::pushInWriteBuffer(const BufferItem& newItem) {
std::unique_lock<std::mutex> lock(_writeMutex);
while (_writeBuffer.size() >= _maxWriteBufferItems)
_writeBufferChange.wait(lock);
_writeBuffer.emplace(newItem);
_writeBufferChange.notify_all();
}
void WriteThread::FlushThread::operator()(
std::unique_ptr<imagesets::ImageSet> imageSet) {
operator()(imageSet.get());
}
void WriteThread::FlushThread::operator()(imagesets::ImageSet* imageSet) {
std::unique_lock<std::mutex> lock(_parent->_writeMutex);
do {
while (_parent->_writeBuffer.size() <
_parent->_minWriteBufferItemsForWriting &&
!_parent->_isWriteFinishing)
_parent->_writeBufferChange.wait(lock);
std::stack<BufferItem> bufferCopy;
while (!_parent->_writeBuffer.empty()) {
const BufferItem item = _parent->_writeBuffer.top();
_parent->_writeBuffer.pop();
bufferCopy.push(item);
}
_parent->_writeBufferChange.notify_all();
if (bufferCopy.size() >= _parent->_minWriteBufferItemsForWriting)
Logger::Debug << "Flag buffer has reached minimal writing size, flushing "
"flags...\n";
else
Logger::Debug << "Flushing flags...\n";
lock.unlock();
std::unique_lock<std::mutex> ioLock(*_parent->_ioMutex);
while (!bufferCopy.empty()) {
BufferItem item = bufferCopy.top();
bufferCopy.pop();
imageSet->AddWriteFlagsTask(item._index, item._masks);
}
imageSet->PerformWriteFlagsTask();
ioLock.unlock();
lock.lock();
} while (!_parent->_isWriteFinishing || !_parent->_writeBuffer.empty());
}
|