File: BlockingQueue.h

package info (click to toggle)
libtgvoip 2.4.2-1
  • links: PTS, VCS
  • area: main
  • in suites: buster, sid
  • size: 9,004 kB
  • sloc: cpp: 61,331; ansic: 24,725; sh: 4,150; makefile: 749; asm: 242
file content (92 lines) | stat: -rw-r--r-- 1,678 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//
// libtgvoip is free and unencumbered public domain software.
// For more information, see http://unlicense.org or the UNLICENSE file
// you should have received with this source code distribution.
//

#ifndef LIBTGVOIP_BLOCKINGQUEUE_H
#define LIBTGVOIP_BLOCKINGQUEUE_H

#include <stdlib.h>
#include <list>
#include "threading.h"
#include "utils.h"

using namespace std;

namespace tgvoip{

template<typename T>
class BlockingQueue{
public:
	TGVOIP_DISALLOW_COPY_AND_ASSIGN(BlockingQueue);
	BlockingQueue(size_t capacity) : semaphore(capacity, 0){
		this->capacity=capacity;
		overflowCallback=NULL;
	};

	~BlockingQueue(){
		semaphore.Release();
	}

	void Put(T thing){
		MutexGuard sync(mutex);
		queue.push_back(std::move(thing));
		bool didOverflow=false;
		while(queue.size()>capacity){
			didOverflow=true;
			if(overflowCallback){
				overflowCallback(std::move(queue.front()));
				queue.pop_front();
			}else{
				abort();
			}
		}
		if(!didOverflow)
			semaphore.Release();
	}

	T GetBlocking(){
		semaphore.Acquire();
		MutexGuard sync(mutex);
		return GetInternal();
	}

	T Get(){
		MutexGuard sync(mutex);
		if(queue.size()>0)
			semaphore.Acquire();
		return GetInternal();
	}

	unsigned int Size(){
		return queue.size();
	}

	void PrepareDealloc(){

	}

	void SetOverflowCallback(void (*overflowCallback)(T)){
		this->overflowCallback=overflowCallback;
	}

private:
	T GetInternal(){
		//if(queue.size()==0)
		//	return NULL;
		T r=std::move(queue.front());
		queue.pop_front();
		return r;
	}

	list<T> queue;
	size_t capacity;
	//tgvoip_lock_t lock;
	Semaphore semaphore;
	Mutex mutex;
	void (*overflowCallback)(T);
};
}

#endif //LIBTGVOIP_BLOCKINGQUEUE_H