1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
|
#include "chunk_queue.h"
#include <sys/time.h>
#include <errno.h>
#include <iostream>
ChunkQueue::ChunkQueue()
{
pthread_mutex_init(&workmutex, NULL);
pthread_cond_init(&popconvar, NULL);
pthread_cond_init(&pushconvar, NULL);
state = STARTED;
}
ChunkQueue::~ChunkQueue()
{
for(unsigned int i = 0; i < workqueue.size(); ++i)
{
delete(workqueue[i]);
}
}
void ChunkQueue::pushOp(long len, long oset, DAVIX_FD* davfd)
{
struct timespec to;
bool pushed = false;
pthread_mutex_lock(&workmutex);
to.tv_sec = time(NULL) + DEFAULT_WAIT_TIME;
to.tv_nsec = 0;
while(!pushed)
{
if(workqueue.size() < 1000)
{
worktoken* tk = new(worktoken);
tk->length = len;
tk->offset = oset;
tk->fd = davfd;
workqueue.push_back(tk);
pushed = true;
break;
}
int rc = pthread_cond_timedwait(&pushconvar, &workmutex, &to);
if(rc == ETIMEDOUT)
{
std::cerr << std::endl << "pushOp() timed out." << std::endl;
break;
}
}
//signal worker, job available
pthread_mutex_unlock(&workmutex);
pthread_cond_signal(&popconvar);
}
struct ChunkQueue::worktoken *ChunkQueue::getOp()
{
struct worktoken* mytk = 0;
struct timespec to;
pthread_mutex_lock(&workmutex);
to.tv_sec = time(NULL) + DEFAULT_WAIT_TIME;
to.tv_nsec = 0;
while(!mytk)
{
if(workqueue.size() > 0)
{
mytk = workqueue.front();
workqueue.pop_front();
break;
}
int rc = pthread_cond_timedwait(&popconvar, &workmutex, &to);
if(rc == ETIMEDOUT)
{
std::cerr << std::endl << "getOp() timed out." << std::endl;
break;
}
}
pthread_mutex_unlock(&workmutex);
// there is now room in the workqueue, signal producer
pthread_cond_signal(&pushconvar);
return (mytk);
}
void ChunkQueue::StopThreads()
{
state = STOPPED;
pthread_cond_broadcast(&pushconvar);
pthread_cond_broadcast(&popconvar);
}
int ChunkQueue::GetQueueSize()
{
return workqueue.size();
}
int ChunkQueue::GetQueueState()
{
return state;
}
void ChunkQueue::SetQueueState(int new_state)
{
state = new_state;
}
|