1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
/*++
Module Name:
ParallelTask.cpp
Abstract:
Parallel task management
Environment:
User mode service.
--*/
#include "stdafx.h"
#include "ParallelTask.h"
#include "Error.h"
using std::max;
ParallelCoworker::ParallelCoworker(int i_numThreads, bool i_bindToProcessors, ParallelWorkerManager* i_manager, Callback i_callback, void* i_parameter)
: stopped(false), numThreads(i_numThreads), bindToProcessors(i_bindToProcessors), manager(i_manager), callback(i_callback), parameter(i_parameter)
{
workReady = new EventObject[numThreads];
workDone = new EventObject[numThreads];
workers = new ParallelWorker*[max(numThreads, 1)];
for (int i = 0; i < numThreads; i++) {
CreateEventObject(&workReady[i]);
PreventEventWaitersFromProceeding(&workReady[i]);
CreateEventObject(&workDone[i]);
PreventEventWaitersFromProceeding(&workDone[i]);
workers[i] = manager->createWorker();
workers[i]->configure(manager, i, numThreads);
}
CreateSingleWaiterObject(&finished);
}
ParallelCoworker::~ParallelCoworker()
{
for (int i = 0; i < numThreads; i++) {
DestroyEventObject(&workReady[i]);
DestroyEventObject(&workDone[i]);
delete workers[i];
}
delete [] workReady;
delete [] workDone;
delete [] workers;
delete task;
delete context;
}
void ParallelCoworker::start()
{
context = new WorkerContext();
context->shared = this;
context->totalThreads = numThreads;
context->bindToProcessors = bindToProcessors;
#ifdef _MSC_VER
context->useTimingBarrier = false;
#endif
task = new ParallelTask<WorkerContext>(context);
task->fork();
}
void ParallelCoworker::step()
{
manager->beginStep();
for (int i = 0; i < numThreads; i++) {
PreventEventWaitersFromProceeding(&workDone[i]);
AllowEventWaitersToProceed(&workReady[i]);
}
// if async, thread 0 will callback when all workers finish
// if sync, wait for all workers to finish
if (callback == NULL) {
for (int i = 0; i < numThreads; i++) {
WaitForEvent(&workDone[i]);
}
manager->finishStep();
}
}
void ParallelCoworker::stop()
{
stopped = true;
for (int i = 0; i < numThreads; i++) {
AllowEventWaitersToProceed(&workReady[i]);
}
if (!WaitForSingleWaiterObject(&finished)) {
WriteErrorMessage("Waiting for all threads to finish failed\n");
soft_exit(1);
}
}
void
WorkerContext::initializeThread()
{
shared->workers[threadNum]->initialize();
}
void
WorkerContext::runThread()
{
while (true) {
//fprintf(stderr, "worker task thread %d waiting to begin\n", GetCurrentThreadId());
WaitForEvent(&shared->workReady[threadNum]);
PreventEventWaitersFromProceeding(&shared->workReady[threadNum]);
if (shared->stopped) {
return;
}
//fprintf(stderr, "worker task thread %d begin\n", GetCurrentThreadId());
_int64 start = timeInMillis();
shared->workers[threadNum]->step();
//fprintf(stderr, "zip task thread %d done %lld ms\n", GetCurrentThreadId(), timeInMillis() - start);
AllowEventWaitersToProceed(&shared->workDone[threadNum]);
// if async, thread 0 will wait for everyone to finish and invoke callback
if (threadNum == 0 && shared->callback != NULL) {
for (int i = 1; i < shared->numThreads; i++) {
WaitForEvent(&shared->workDone[i]);
}
shared->manager->finishStep();
shared->callback(shared->parameter);
}
}
}
void
WorkerContext::finishThread(WorkerContext* common)
{
if (threadNum == totalThreads - 1) {
SignalSingleWaiterObject(&shared->finished);
}
}
void
ParallelWorkerManager::configure(
ParallelWorker* worker,
int threadNum,
int totalThreads)
{
worker->configure(this, threadNum, totalThreads);
}
|