File: tutorial-08-matrix_multiply.md

package info (click to toggle)
workflow 0.11.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,744 kB
  • sloc: cpp: 33,792; ansic: 9,393; makefile: 9; sh: 6
file content (252 lines) | stat: -rw-r--r-- 9,282 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# 自定义计算任务:matrix_multiply
# 示例代码

[tutorial-08-matrix_multiply.cc](/tutorial/tutorial-08-matrix_multiply.cc)

# 关于matrix_multiply

程序执行代码里两个矩阵的乘法,并将相乘结果打印在屏幕上。  
示例的主要目的是展现怎么实现一个自定义CPU计算任务。

# 定义计算任务

定义计算任务需要提供3个基本信息,分别为INPUT,OUTPUT,和routine。  
INPUT和OUTPUT是两个模板参数,可以是任何类型。routine表示从INPUT到OUTPUT的过程,定义如下:  
~~~cpp
template <class INPUT, class OUTPUT>
class __WFThreadTask
{
    ...
    std::function<void (INPUT *, OUTPUT *)> routine;
    ...
};
~~~
可以看出routine是一个简单的从INPUT到OUTPUT的计算过程。INPUT指针不要求是const,但用户也可以传const INPUT *的函数。  
比如一个加法任务,就可这么做:
~~~cpp
struct add_input
{
    int x;
    int y;
};

struct add_ouput
{
    int res;
};

void add_routine(const add_input *input, add_output *output)
{
    output->res = input->x + input->y;
}

typedef WFThreadTask<add_input, add_output> add_task;
~~~
在我们的矩阵乘法的示例里,输入是两个矩阵,输出为一个矩阵。其定义如下:
~~~cpp
namespace algorithm
{

using Matrix = std::vector<std::vector<double>>;

struct MMInput
{
    Matrix a;
    Matrix b;
};

struct MMOutput
{
    int error;
    size_t m, n, k;
    Matrix c;
};

void matrix_multiply(const MMInput *in, MMOutput *out)
{
    ...
}

}
~~~
矩阵乘法存在有输入矩阵不合法的问题,所以output里多了一个error域,用来表示错误。

# 生成计算任务

定义好输入输出的类型,以及算法的过程之后,就可以通过WFThreadTaskFactory工厂来产生计算任务了。  
在[WFTaskFactory.h](../src/factory/WFTaskFactory.h)里,计算工厂类的定义如下:
~~~cpp
template <class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:
    using T = WFThreadTask<INPUT, OUTPUT>;

public:
    static T *create_thread_task(const std::string& queue_name,
                                 std::function<void (INPUT *, OUTPUT *)> routine,
                                 std::function<void (T *)> callback);

    static T *create_thread_task(time_t seconds, long nanoseconds,
                                 const std::string& queue_name,
                                 std::function<void (INPUT *, OUTPUT *)> routine,
                                 std::function<void (T *)> callback);
    ...
};
~~~
这里包含两个创建任务的接口。第二个接口支持用户传入一下任务运行时间限制,我们在一下节介绍这个功能。  
与之前的网络工厂类或算法工厂类略有不同,这个类需要INPUT和OUTPUT两个模板参数。  
queue_name相关的知识在上一个示例里已经有介绍。routine就是你的计算过程,callback是回调。  
在我们的示例里,我们看到了这个调用的使用:
~~~cpp
using MMTask = WFThreadTask<algorithm::MMInput,
                            algorithm::MMOutput>;

using namespace algorithm;

int main()
{
    typedef WFThreadTaskFactory<MMInput, MMOutput> MMFactory;
    MMTask *task = MMFactory::create_thread_task("matrix_multiply_task",
                                                 matrix_multiply,
                                                 callback);

    MMInput *input = task->get_input();

    input->a = {{1, 2, 3}, {4, 5, 6}};
    input->b = {{7, 8}, {9, 10}, {11, 12}};
    ...
}
~~~
产生了task之后,通过get_input()接口得到输入数据的指针。这个可以类比网络任务的get_req()。  
任务的发起和结束什么,与网络任务并没有什么区别。同样,回调也很简单:
~~~cpp
void callback(MMTask *task)     // MMtask = WFThreadTask<MMInput, MMOutput>
{
    MMInput *input = task->get_input();
    MMOutput *output = task->get_output();

    assert(task->get_state() == WFT_STATE_SUCCESS);

    if (output->error)
        printf("Error: %d %s\n", output->error, strerror(output->error));
    else
    {
        printf("Matrix A\n");
        print_matrix(input->a, output->m, output->k);
        printf("Matrix B\n");
        print_matrix(input->b, output->k, output->n);
        printf("Matrix A * Matrix B =>\n");
        print_matrix(output->c, output->m, output->n);
    }
}
~~~
普通的计算任务可以忽略失败的可能性,结束状态肯定是SUCCESS。  
callback里简单打印了输入输出。如果输入数据不合法,则打印错误。

# 带运行时间限制的计算任务

显然,我们的框架无法打断用户的计算任务,因为用户的计算任务是一个函数,用户需要自行确保函数可以正常结束。  
但我们支持用户指定一个时间限制,当计算无法在指定时间内完成,任务可以提前回到callback。带运行时间限制的接口定义如下:
~~~cpp
template <class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:
    using T = WFThreadTask<INPUT, OUTPUT>;

public:
    static T *create_thread_task(time_t seconds, long nanoseconds,
                                 const std::string& queue_name,
                                 std::function<void (INPUT *, OUTPUT *)> routine,
                                 std::function<void (T *)> callback);
    ...
};
~~~
参数seconds和nanoseconds构成了运行时限。在这里,nanoseconds的取值范围在\[0,1000000000)。  
当任务无法在运行时限内结束,会直接回到callback,并且任务的状态为WFT_STATE_SYS_ERROR且错误码为ETIMEDOUT。  
还是用matrix_multiply的例子,我们可以这样写:
~~~cpp
void callback(MMTask *task)     // MMtask = WFThreadTask<MMInput, MMOutput>
{
    MMInput *input = task->get_input();
    MMOutput *output = task->get_output();

    if (task->get_state() == WFT_STATE_SYS_ERROR && task->get_error() == ETIMEDOUT)
    {
        printf("Run out of time.\n");
        return;
    }

    assert(task->get_state() == WFT_STATE_SUCCESS)

    if (output->error)
        printf("Error: %d %s\n", output->error, strerror(output->error));
    else
    {
        printf("Matrix A\n");
        print_matrix(input->a, output->m, output->k);
        printf("Matrix B\n");
        print_matrix(input->b, output->k, output->n);
        printf("Matrix A * Matrix B =>\n");
        print_matrix(output->c, output->m, output->n);
    }
}

using namespace algorithm;

int main()
{
    typedef WFThreadTaskFactory<MMInput, MMOutput> MMFactory;
    MMTask *task = MMFactory::create_thread_task(0, 1000000,
                                                 "matrix_multiply_task",
                                                 matrix_multiply,
                                                 callback);

    MMInput *input = task->get_input();

    input->a = {{1, 2, 3}, {4, 5, 6}};
    input->b = {{7, 8}, {9, 10}, {11, 12}};
    ...
}
~~~
上面的示例,限制了任务运行时间不超过1毫秒,否则,以WFT_STATE_SYS_ERROR的状态返回。  
再次提醒,我们并不会中断用户的实际运行函数。当任务超时并callback,计算函数还会一直运行直到结束。  
如果用户希望函数不再继续执行,需要在代码中自行加入检查点来实现这样的功能。可以在INPUT里加入flag,例如:
~~~cpp
void callback(MMTask *task)     // MMtask = WFThreadTask<MMInput, MMOutput>
{
    if (task->get_state() == WFT_STATE_SYS_ERROR && task->get_error() == ETIMEDOUT)
    {
        task->get_input()->flag = true;
        printf("Run out of time.\n");
        return;
    }
    ...
}

void matrix_multiply(const MMInput *in, MMOutput *out)
{
    while (!in->flag)
    {
        ....
    }
}
~~~

# 算法与协议的对称性

在我们的体系里,算法与协议在一个非常抽象的层面上是具有高度对称性的。  
有自定义算法的线程任务,那显然也存在自定义协议的网络任务。  
自定义算法要求提供算法的过程,而自定义协议则需要用户提供序列化和反序列化的过程,[简单的用户自定义协议client/server](./tutorial-10-user_defined_protocol.md)有介绍。  
无论是自定义算法还是自定义协议,我们都必须强调算法和协议都是非常纯粹的。  
例如算法就是一个从INPUT到OUPUT的转换过程,算法并不知道task,series等的存在。  
HTTP协议的实现上,也只关心序列化反序列化,无需要关心什么是task。而是在http task里去引用HTTP协议。  

# 线程任务与网络任务的复合性

在这个示例里,我们通过WFThreadTaskFactory构建了一个线程任务。可以说这是一种最简单的计算任务构建,大多数情况下也够用了。  
同样,用户可以非常简单的定义一个自有协议的server和client。  
但在上一个示例里我们看到,我们可以通过算法工厂产生一个并行排序任务,这显然不是通过一个routine就能做到的。  
对于网络任务,比如一个kafka任务,可能要经过与多台机器的交互才能得到结果,但对用户来讲是完全透明的。  
所以,我们的任务都是具有复合性的,如果你熟练使用我们的框架,可以设计出很多复杂的组件出来。