File: writethread.cpp

package info (click to toggle)
aoflagger 3.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 4,476 kB
  • sloc: cpp: 51,868; python: 152; sh: 25; makefile: 17
file content (85 lines) | stat: -rw-r--r-- 2,592 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include "writethread.h"

#include "../util/logger.h"

WriteThread::WriteThread(rfiStrategy::ImageSet& imageSet, size_t calcThreadCount, std::mutex* ioMutex) :
	_ioMutex(ioMutex),
	_isWriteFinishing(false),
	_maxWriteBufferItems(calcThreadCount*5),
	_minWriteBufferItemsForWriting(calcThreadCount*4)
{
	std::unique_lock<std::mutex> iolock(*_ioMutex);
	std::unique_ptr<rfiStrategy::ImageSet> localImageSet = imageSet.Clone();
	iolock.unlock();
	FlushThread flushFunction;
	flushFunction._parent = this;
	_flusher.reset( new std::thread(flushFunction, std::move(localImageSet)) );
}

WriteThread::~WriteThread()
{
	std::unique_lock<std::mutex> lock(_writeMutex);
	_isWriteFinishing = true;
	_writeBufferChange.notify_all();
	lock.unlock();
	
	Logger::Debug << "Finishing the flusher thread...\n";
	_flusher->join();
}

void WriteThread::SaveFlags(const TimeFrequencyData& data, rfiStrategy::ImageSetIndex& imageSetIndex)
{
	std::vector<Mask2DCPtr> masks;
	if(data.MaskCount() <= 1 )
		masks.emplace_back(data.GetSingleMask());
	else for(size_t i=0;i<data.MaskCount();++i)
	{
		masks.emplace_back(data.GetMask(i));
	}
	BufferItem newItem(masks, imageSetIndex);
	pushInWriteBuffer(newItem);
}

void WriteThread::pushInWriteBuffer(const BufferItem &newItem)
{
	std::unique_lock<std::mutex> lock(_writeMutex);
	while(_writeBuffer.size() >= _maxWriteBufferItems)
		_writeBufferChange.wait(lock);
	_writeBuffer.emplace(newItem);
	_writeBufferChange.notify_all();
}

void WriteThread::FlushThread::operator()(std::unique_ptr<rfiStrategy::ImageSet> imageSet)
{
	std::unique_lock<std::mutex> lock(_parent->_writeMutex);
	do {
		while(_parent->_writeBuffer.size() < _parent->_minWriteBufferItemsForWriting && !_parent->_isWriteFinishing)
			_parent->_writeBufferChange.wait(lock);

		std::stack<BufferItem> bufferCopy;
		while(!_parent->_writeBuffer.empty())
		{
			BufferItem item = _parent->_writeBuffer.top();
			_parent->_writeBuffer.pop();
			bufferCopy.push(item);
		}
		_parent->_writeBufferChange.notify_all();
		if(bufferCopy.size() >= _parent->_minWriteBufferItemsForWriting)
			Logger::Debug << "Flag buffer has reached minimal writing size, flushing flags...\n";
		else
			Logger::Debug << "Flushing flags...\n";
		lock.unlock();

		std::unique_lock<std::mutex> ioLock(*_parent->_ioMutex);
		while(!bufferCopy.empty())
		{
			BufferItem item = bufferCopy.top();
			bufferCopy.pop();
			imageSet->AddWriteFlagsTask(item._index, item._masks);
		}
		imageSet->PerformWriteFlagsTask();
		ioLock.unlock();

		lock.lock();
	} while(!_parent->_isWriteFinishing || !_parent->_writeBuffer.empty());
}