1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
In this section another program is developed. This section's example program
illustrates the use of tt(packaged_tasks).
Like the multi-threaded quicksort example a worker pool is used. However, in
this example the workers in fact do not know what their task is. In the
current example the tasks happens to be identical, but different tasks might
as well have been used, without having to update the workers.
The program uses a tt(class Task) containing a command-specification
(tt(d_command)), and a task specification (tt(d_task)) (cf. fig(compile)), the
sources of the program are found in the
tt(yo/threading/examples/multicompile) directory of the annotations().
figure(threading/compile)
(Data structure used for the multi-threading compilation)
(compile)
In this program tt(main) starts by firing up its workforce in a series of
threads. Following this, the compilation jobs are prepared and pushed on a
tt(task-queue) by tt(jobs), where they're retrieved from by the workers. Once
the compilations have been completed (i.e., after the worker threads have
joined the main thread), the results of the compilation jobs are handled by
tt(results):
verbinclude(-s4 //code examples/multicompile/main.cc)
The tt(jobs) function receives the names of the files to compile from the
tt(nextCommand) function, which ignores empty lines and returns non-empty
lines. Eventually tt(nextCommand) returns an empty line once all lines of the
standard input stream have been read:
verbinclude(-s4 //code examples/multicompile/nextcommand.cc)
With non-empty lines tt(jobs) waits for an available worker using (line 12)
the tt(g_dispatcher) semaphore. Initialized to the size of the work force, it
is reduced by an active worker, and incremented by workers who have completed
their tasks. If a compilation fails, then tt(g_done) is set to tt(true) and no
additional compilations are performed (lines 14, 15). While tt(jobs) receives
the names of the files to compile, workers may detect compilation errors. If
so, the workers set variable tt(g_done) to tt(true). Once the tt(job)
function's tt(while) loop ends the workers are notified once again (line 24),
who will then, because there's no task to perform anymore, end their threads
verbinclude(-ns4 //code examples/multicompile/jobs.cc)
The function tt(newTask) prepares the program for the next task. First a
tt(Task) object is constructed. tt(Task) contains the name of the file to
compile, and a tt(packaged_task). It encapsulates all activities that are
associated with a tt(packaged_task). Here is its (in-class) definition:
verbinclude(-ns4 //task examples/multicompile/main.ih)
Note (lines 22-25) that tt(result) returns a em(shared_future). Since the
dispatcher runs in a different thread than the one processing the results, the
tt(futures) created by the dispatcher must be shared with the tt(futures)
required by the function processing the results. Hence the tt(shared_futures)
returned by tt(Task::result).
Once a tt(Task) object has been constructed its tt(shared_future) object is
pushed on the result queue. Although the actual results aren't available by
this time, the tt(result) function is eventually called to process the results
that were pushed on the result-queue. Additionally, the tt(Task) itself is
pushed on a em(task queue), and it will be retrieved by a worker:
verbinclude(-as4 examples/multicompile/main2.ih)
verbinclude(-s4 //code examples/multicompile/pushresultq.cc)
The workers have a simple task: wait for the next task, then retrieve it from
the task queue, and complete that task. Whatever happens inside the tasks
themselves is of no concern to the worker. Also, when notified (normally by
the tt(jobs) function) that there's a task waiting it'll execute that
task. However, at the end, once all tasks have been pushed on the task queue,
tt(jobs) once again notifies the workers. In that case the task queue is
empty, and the worker function ends. But just before that it notifies its
fellow workers, which in turn end, thus ending all worker threads, allowing
them to join the tt(main)-thread:
verbinclude(-s4 //code examples/multicompile/worker.cc)
This completes the description of how tasks are handled.
The task itself are now described. In the current program bf(C++) source files
are compiled. The compilation command is passed to the constructor of a
tt(CmdFork) object, which starts the compiler as a child process. The result
of the compilation is retrieved via its tt(childExit) member (returning the
compiler's exit code) and tt(childOutput) member (returning any textual output
produced by the compiler). If compilation fails, the exit value won't be
zero. In this case no further compilation tasks will be issued as tt(g_done)
is set to tt(true) (lines 11 and 12; the implementation of the tt(class
CmdFork) is available from the annotations()'
tt(yo/threading/examples/cmdfork) directory). Here is the function
tt(compile):
verbinclude(-ns4 //code examples/multicompile/compile.cc)
The tt(results) function continues for as long as tt(newResults) indicates
that results are available. By design the program will show all available
successfully completed compilations, and (if several workers encountered
compilation errors) only the compiler's output of the first
compilation error is displayed. All em(available) successfully completed
compilations meaning that, in case of a compilation error, the source files
that were successfully compiled by the currently active work force are listed,
but remaining source files are not processed anymore:
verbinclude(-s4 //code examples/multicompile/results.cc)
The function tt(newResult) controls tt(results' while)-loop. It returns
tt(true) when as long as the result queue isn't empty, in which case the
queue's front element is stored at the external tt(Result) object, and the
queue's front element is removed from the queue:
verbinclude(-s4 //code examples/multicompile/newresult.cc)
|