File: about-counter.md

package info (click to toggle)
workflow 0.11.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,744 kB
  • sloc: cpp: 33,792; ansic: 9,393; makefile: 9; sh: 6
file content (202 lines) | stat: -rw-r--r-- 8,963 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# About counter

Counters are very important basic tasks in our framework. A counter is essentially a semaphore that does not occupy thread.   
Counters are mainly used for workflow control. It includes anonymous counters and named counters, and can realize very complex business logic.

# Creating a counter

As a counter is also a task, it is created through WFTaskFactory. You can create a counter with one of the following two methods:

~~~cpp
using counter_callback_t = std::function<void (WFCounterTask *)>;

class WFTaskFactory
{
    ...
    static WFCounterTask *create_counter_task(unsigned int target_value,
                                              counter_callback_t callback);
                                              
    static WFCounterTask *create_counter_task(const std::string& counter_name,
                                              unsigned int target_value,
                                              counter_callback_t callback);

    ...
};
~~~

Each counter contains a target\_value. When the count in the counter reaches the target\_value, its callback is called.   
The above two interfaces generate a anonymous counter and a named counter respectively. The anonymous counter directly increases the count through the count method in the WFCounterTask:

~~~cpp
class WFCounterTask
{
public:
    virtual void count()
    {
        ...
    }
    ...
}
~~~

If a counter\_name is passed when you create a counter, a named counter is generated, and the count can be increased with the count\_by\_name function.

# Creating parallel tasks with anonymous counters

In the example of [parallel wget](/docs/en/tutorial-06-parallel_wget.md), we created a ParallelWork to achieve the parallel execution of several series.   
With the combination of ParallelWork and SeriesWork, you can build series-parallel graphs in any form, which can meet the requirements in most scenarios.   
Counters allow us to build more complex dependencies between the tasks, such as a fully connected neural network.   
The following simple code can replace ParallelWork to realize parallel HTTP crawling.

~~~cpp
void http_callback(WFHttpTask *task)
{
    /* Save http page. */
    ...

    WFCounterTask *counter = (WFCounterTask *)task->user_data;
    counter->count();
}

std::mutex mutex;
std::condition_variable cond;
bool finished = false;

void counter_callback(WFCounterTask *counter)
{
    mutex.lock();
    finished = true;
    cond.notify_one();
    mutex.unlock();
}

int main(int argc, char *argv[])
{
    WFCounterTask *counter = create_counter_task(url_count, counter_callback);
    WFHttpTask *task;
    std::string url[url_count];

    /* init urls */
    ...

    for (int i = 0; i < url_count; i++)
    {
        task = create_http_task(url[i], http_callback);
        task->user_data = counter;
        task->start();
    }

    counter->start();
    std::unique_lock<std:mutex> lock(mutex);
    while (!finished)   
        cond.wait(lock);
    lock.unlock();
    return 0;
}
~~~

The above code creates a counter with the target value as url\_count, and calls the count once after each HTTP task is completed.   
Note that the times **count()** a anonymous counter cannot exceed it's target value. Otherwise the counter may have been destroyed after the callback, and the program behavior is undefined.   
The call of **counter->start()** can be placed before the for loop. After a counter is created, you can call its count interface, no matter whether the counter has been started or not.   
You can also use **counter->WFCounterTask::count()** to call the count interface of an anonymous counter; this can be used in performance-sensitive applications.

# Using a server together with other asynchronous engines

In some cases, our server may need to call asynchronous clients in other frameworks and wait for the results. A simple method is that we wait synchronously in the process and then are waken up through conditional variables.   
Its disadvantage is that we occupy a processing thread and turn asynchronous clients in other frameworks into synchronous clients. But with the counter method, we can wait without occupying threads. The method is very simple:

~~~cpp

void some_callback(void *context)
{
    protocol::HttpResponse *resp = get_resp_from_context(context);
    WFCounterTask *counter = get_counter_from_context(context);
    /* write data to resp. */
    ...
    counter->count();
}

void process(WFHttpTask *task)
{
    WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);

    SomeOtherAsyncClient client(some_callback, context);

    *series_of(task) << counter;
}
~~~

Here, we can consider the series of a server task as a coroutine, and the counter whose target value is 1 can be considered as a conditional variable.

# Named counters

When the count operation is executed on the anonymous counter, the counter object pointer is directly accessed. This inevitably requires that the number of calls to count should not exceed the target value during operation.   
But imagine an application scenario where we start four tasks at the same time, and as long as any three tasks are completed, the workflow can continue.   
We can use a counter with a target value of 3, and count once after each task is completed. As long as three tasks are completed, the callback of the counter will be executed.   
But the problem is that when the fourth task is finished and **counter->count()** is called again, the counter is already a wild pointer and the program crashes.   
In this case, we can use named counters to solve this problem. By naming the counter and counting by name, we can have the following implementation:

~~~cpp
void counter_callback(WFCounterTask *counter)
{
    WFRedisTask *next = WFTaskFactory::create_redis_task(...);
    series_of(counter)->push_back(next);
}

int main(void)
{
    WFHttpTask *tasks[4];
    WFCounterTask *counter;

    counter = WFTaskFactory::create_counter_task("c1", 3, counter_callback);
    counter->start();

    for (int i = 0; i < 4; i++)
    {
        tasks[i] = WFTaskFactory::create_http_task(..., [](WFHttpTask *task){
                                            WFTaskFactory::count_by_name("c1"); });
        tasks[i]->start();
    }

    ...

}
~~~

In this example, four concurrent HTTP tasks are started, three of which are completed, and a Redis task is started immediately. In the practical application, you may need to add the code of data transmission.   
In the example, a counter named "c1" is created, and in the HTTP callback, call **WFTaskFactory::count\_by\_name()** to increase the count.

~~~cpp
class WFTaskFactory
{
    ...
    static int count_by_name(const std::string& counter_name);

    static int count_by_name(const std::string& counter_name, unsigned int n);
    ...
};
~~~

You can pass an integer n to **WFTaskFactory::count\_by\_name**, indicating the count value to be increased in this operation. Obviously:   
**count\_by\_name("c1")** is equivalent to **count\_by\_name("c1", 1)**.   
If the "c1" counter does not exist (not created or already completed), the operation on "c1" will have no effect, so the wild pointer problem in an anonymous counter will not happen here.  
The **count\_by\_name()** function returns the number of counters that was waked up by the operation. When **n** is greater that 1, more than one counter may reach target value.

# Definition of the detailed behaviors of named counters

When you **call WFTaskFactory::count\_by\_name(name, n)**:

* if the name does not exist (not created or already completed), there is no action.
* if there is only one counter with that name:
  * if the remaining value of the counter is less than or equal to n, the counting is completed, the callback is called, and the counter is destroyed. end.
  * if the remaining value of the counter is greater than n, the count value is increased by n. end.
* if there are multiple counters with that name:
  * according to the order of creation, take the first counter and assume that its remaining value is m:
    * if m is greater than n, the count value is increased by n. end (the remaining value is m-n).
    * if m is less than or equal to n, the counting is completed, the callback is called, and the counter is destroyed. set n = n-m.
      * If n is 0, the procedure ends.
      * If n is greater than 0, take out the next counter with the same name and repeat the whole operation.

Although the description is very complicated, it can be summed up in one sentence. Access all counters with that name according to the order of creation one by one until n is 0.   
In other words, one **count\_by\_name(name, n)** may wake up multiple counters.   
The counters can be used to realize very complex business logic if you can use them well. In our framework, counters are often used to implement asynchronous locks or to build channels between tasks. It is more like a control task in form.