1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
# A simple parallel wget: parallel\_wget
# Sample code
[tutorial-06-parallel\_wget.cc](/tutorial/tutorial-06-parallel_wget.cc)
# About parallel\_wget
It is our first example on parallel tasks.
The program reads multiple HTTP URLs (separated by spaces) from the command line, crawls these URLs in parallel, and prints the crawled results to the standard output according to the input order.
# Creating a parallel task
In the previous example, you have already learned the SeriesWork class.
* SeriesWork consists of a series of tasks that are executed sequentially. The series finishes when all its tasks finish.
* ParallelWork class, corresponding to the SeriesWork, consists of multiple series that are executed in parallel. The parallel work finishes when all its series finish.
* ParallelWork is a task.
According to the above definition, you can generate any complex workflow dynamically or statically.
The Workflow class has two interfaces for generating parallel tasks:
~~~cpp
class Workflow
{
...
public:
static ParallelWork *
create_parallel_work(parallel_callback_t callback);
static ParallelWork *
create_parallel_work(SeriesWork *const all_series[], size_t n,
parallel_callback_t callback);
...
};
~~~
The first interface creates an empty parallel task, and the second interface creates parallel tasks with a series array.
Before you start the parallel work, you can use **add\_series()** interface of the ParallelWork to add series to the parallel tasks generated by either interface.
In the sample code, we create an empty parallel task and then add the series one by one.
~~~cpp
int main(int argc, char *argv[])
{
ParallelWork *pwork = Workflow::create_parallel_work(callback);
SeriesWork *series;
WFHttpTask *task;
HttpRequest *req;
tutorial_series_context *ctx;
int i;
for (i = 1; i < argc; i++)
{
std::string url(argv[i]);
...
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
[](WFHttpTask *task)
{
// store resp to ctx.
});
req = task->get_req();
// add some headers.
...
ctx = new tutorial_series_context;
ctx->url = std::move(url);
series = Workflow::create_series_work(task, nullptr);
series->set_context(ctx);
pwork->add_series(series);
}
...
}
~~~
You can see that we first create an HTTP task in the code, but the HTTP task cannot be directly added to the parallel task, so we need to use it to create a series first.
Each series has its own context, which is used to save the URL and the crawled results. You can learn related methods in our previous examples.
# Saving and using the crawled results
The callback of an HTTP task is a simple lambda function, which saves the crawled result in its own series context, so that it can be retrieved by the parallel task.
~~~cpp
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
[](WFHttpTask *task)
{
tutorial_series_context *ctx =
(tutorial_series_context *)series_of(task)->get_context();
ctx->state = task->get_state();
ctx->error = task->get_error();
ctx->resp = std::move(*task->get_resp());
});
~~~
This is necessary, because HTTP tasks will be recycled after the callback, so we have to use **std::move()** to move the resp.
In the callback of parallel tasks, we can easily get the results:
~~~cpp
void callback(const ParallelWork *pwork)
{
tutorial_series_context *ctx;
const void *body;
size_t size;
size_t i;
for (i = 0; i < pwork->size(); i++)
{
ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
printf("%s\n", ctx->url.c_str());
if (ctx->state == WFT_STATE_SUCCESS)
{
ctx->resp.get_parsed_body(&body, &size);
printf("%zu%s\n", size, ctx->resp.is_chunked() ? " chunked" : "");
fwrite(body, 1, size, stdout);
printf("\n");
}
else
printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);
delete ctx;
}
}
~~~
Here, you can see the two new interfaces of ParallelWork, **size()** and **series\_at(i)**, which are used to obtain the number of the series in parallel and the ith parallel series respectively.
You can use **series->get\_context()** to get the context of the series and print out the results.The printing order must be the same as with the order you add the series into the work.
In this example, there is no other work after the parallel tasks finish.
As we said above, ParallelWork is a kind of tasks, so you can use **series\_of()** to get its series and add a new task.
However, if the crawled results are used in the new task, you need to use **std::move()** to move the data to the context of the series of that parallel task.
# Starting a parallel task
As a parallel task is a kind of tasks, so there is nothing special in starting a parallel task. You can call **start()** directly, or you can use it to build or start a series.
In this example, we start a series, wake up the main process in the callback of this series, and exit the program normally.
We can also wake up the main process in the callback of parallel tasks, and there is little difference in the program behaviors. However, it is more formal to wake up the main process in the callback of the series.
|