File: ParallelTask.h

package info (click to toggle)
snap-aligner 2.0.3%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 6,652 kB
  • sloc: cpp: 41,051; ansic: 5,239; python: 227; makefile: 85; sh: 28
file content (301 lines) | stat: -rw-r--r-- 7,820 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/*++

Module Name:

    ParallelTask.h

Abstract:

    Simple parallel task manager

Authors:

    Ravi Pandya, May 2012

Environment:

    User mode service.

Revision History:

--*/

#pragma once
#include "stdafx.h"
#include "Compat.h"
#include "exit.h"
#include "Error.h"

/*++
    Simple class to handle parallelized algorithms.
    TContext should extend TContextBase, and provide the following methods:
        void initializeThread()
            Called once on main thread after TContext has been assigned from common,
            and threadNum set.
        void runThread()
            Called to run the thread's work until termination.
            May use something like RangeSplitter to get work.
        void finishThread(TContext* common)
            Called once on main thread after all threads have finished,
            to write results back to common.
--*/
    template <class TContext>
class ParallelTask
{
public:

    inline TContext* getCommonContext() { return common; }

    // i_common should have totalThreads & bindToProcessors set
    ParallelTask(TContext* i_common);

    // run all threads until completion, gather results in common
    void run();

    // run all tasks on a separate thread
    void fork();

private:

    // initial & final context
    TContext*   common;

    // array of per-thread contexts
    TContext*   contexts;

    static void threadWorker(void* threadContext);

    static void forkWorker(void* threadContext);
};

/*++
    Base for type parameter to parallel task
--*/
struct TaskContextBase
{
    // should be set before passing to ParallelTask constructor
    int                 totalThreads;
    bool                bindToProcessors;

    // time taken to run in millis
    _int64              time;

    // for internal use:
    int                 threadNum;        // current thread number, 0...totalThreads-1
    SingleWaiterObject *doneWaiter;       // Gets notified when the last thread ends.
    volatile int        runningThreads;
    volatile int       *pRunningThreads;
#ifdef  _MSC_VER
    volatile int       *nThreadsAllocatingMemory;
    EventObject        *memoryAllocationCompleteBarrier;
    bool                useTimingBarrier;
#endif  // _MSC_VER
};



    template <class TContext>
ParallelTask<TContext>::ParallelTask(
    TContext* i_common)
    : common(i_common), contexts(NULL)
{
    _ASSERT(i_common->totalThreads > 0);
}

    template <class TContext>
    void
ParallelTask<TContext>::run()
{
    _int64 start = timeInMillis();
    SingleWaiterObject doneWaiter;
    if (!CreateSingleWaiterObject(&doneWaiter)) {
        WriteErrorMessage( "Failed to create single waiter object for thread completion.\n");
        soft_exit(1);
    }

#ifdef  _MSC_VER
    int nThreadsAllocatingMemory = common->totalThreads;
    EventObject memoryAllocationCompleteBarrier;
    CreateEventObject(&memoryAllocationCompleteBarrier);

    common->nThreadsAllocatingMemory = &nThreadsAllocatingMemory;
    common->memoryAllocationCompleteBarrier = &memoryAllocationCompleteBarrier;
#endif  // _MSC_VER
    common->doneWaiter = &doneWaiter;
    common->runningThreads = common->totalThreads;
    common->pRunningThreads = &common->runningThreads;

    contexts = new TContext[common->totalThreads];
    for (int i = 0; i < common->totalThreads; i++) {
        contexts[i] = *common;
        contexts[i].threadNum = i;
        contexts[i].initializeThread();

        if (!StartNewThread(ParallelTask<TContext>::threadWorker, &contexts[i])) {
            WriteErrorMessage( "Unable to start worker thread.\n");
            soft_exit(1);
        }
    }

#ifdef  _MSC_VER
    if (common->useTimingBarrier) {
        WaitForEvent(&memoryAllocationCompleteBarrier);
        WriteStatusMessage("Cleared timing barrier.\n");
        start = timeInMillis();
    }
#endif  // _MSC_VER

    if (!WaitForSingleWaiterObject(&doneWaiter)) {
        WriteErrorMessage( "Waiting for all threads to finish failed\n");
        soft_exit(1);
    }
    DestroySingleWaiterObject(&doneWaiter);
#ifdef  _MSC_VER
    DestroyEventObject(&memoryAllocationCompleteBarrier);
#endif  // _MSC_VER

    for (int i = 0; i < common->totalThreads; i++) {
        contexts[i].finishThread(common);
    }

    common->time = timeInMillis() - start;
}

    template <class TContext>
    void
ParallelTask<TContext>::fork()
{
    if (!StartNewThread(ParallelTask<TContext>::forkWorker, this)) {
        WriteErrorMessage( "Unable to fork task thread.\n");
        soft_exit(1);
    }
}

    template <class TContext>
    void
ParallelTask<TContext>::forkWorker(
    void* forkArg)
{
    ((ParallelTask<TContext>*) forkArg)->run();
}

    template <class TContext>
    void
ParallelTask<TContext>::threadWorker(
    void* threadArg)
{
    TContext* context = (TContext*) threadArg;
    if (context->bindToProcessors && !DoesThreadHaveProcessorAffinitySet()) {
        BindThreadToProcessor(context->threadNum);
    }

    context->runThread();

    // Decrement the running thread count and wake up the waiter if it hits 0.
    if (0 == InterlockedDecrementAndReturnNewValue(context->pRunningThreads)) {
        SignalSingleWaiterObject(context->doneWaiter);
	}
}

struct WorkerContext;
class ParallelWorker;
class ParallelWorkerManager;

// coroutined parallel workers
// does code inline if numThreads = 0
// can either callback when done, or synchronously wait for all to complete
class ParallelCoworker
{
public:

    typedef void (*Callback)(void*);
    ParallelCoworker(int i_numThreads, bool i_bindToProcessors, ParallelWorkerManager* supplier, Callback callback = NULL, void* parameter = NULL);

    ~ParallelCoworker();

    // start forked thread running
    void start();

    // do one unit of work, asynchronously if callback, else synchronously
    void step();

    // stop everything, waits until all threads exit
    void stop();

    ParallelWorkerManager* getManager() { return manager; }

private:
    EventObject *workReady; // One per worker thread
    EventObject *workDone;  // One per worker thread
    ParallelWorker** workers; // one per worker thread
    ParallelWorkerManager* manager;
    volatile bool stopped;
    const int numThreads;
    const bool bindToProcessors;
    Callback callback;
    void* parameter;
    WorkerContext* context;
    ParallelTask<WorkerContext>* task;
    SingleWaiterObject finished;

    friend struct WorkerContext;
};

// abstract classes for specifying the actual work

// creates new per-thread workers
class ParallelWorker;

class ParallelWorkerManager
{
public:

    // todo: using void* context pointers to avoid pain of templates but should really be made typesafe
    virtual void initialize(void* context) {}

    virtual ParallelWorker* createWorker() = 0;

    virtual ~ParallelWorkerManager() {}

    virtual void beginStep() {}

    virtual void finishStep() {}

    void configure(ParallelWorker* worker, int threadNum, int totalThreads); // special case
};

// per-thread worker
class ParallelWorker
{
public:
    ParallelWorker() {}
    virtual ~ParallelWorker() {}

    virtual void initialize() {}
    virtual void step() = 0;

protected:
    ParallelWorkerManager* getManager() { return manager; }
    int getThreadNum() { return threadNum; }
    int getNumThreads() { return numThreads; }

private:
    friend class ParallelCoworker;
    friend class ParallelWorkerManager;
    void configure(ParallelWorkerManager* i_manager, int i_threadNum, int i_numThreads)
    { manager = i_manager; threadNum = i_threadNum; numThreads = i_numThreads; }

    ParallelWorkerManager* manager;
    int threadNum;
    int numThreads;
};

struct WorkerContext : public TaskContextBase
{
    ParallelCoworker* shared;

    void initializeThread();
    void runThread();
    void finishThread(WorkerContext* common);
};