File: rpc.md

package info (click to toggle)
srpc 0.10.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,100 kB
  • sloc: cpp: 23,170; python: 10; makefile: 8; sh: 6
file content (481 lines) | stat: -rw-r--r-- 22,167 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
[English version](/docs/en/rpc.md)

## 基础功能对比
|RPC                        |IDL        |通信  | 网络数据     |压缩                | Attachement |  半同步  |  异步  |  Streaming  |
|---------------------------|-----------|------|------------|--------------------|------------|---------|--------|-------------|
|Thrift Binary Framed       | Thrift    | tcp  | 二进制      |不支持               | 不支持      |  支持    | 不支持  |  不支持      |
|Thrift Binary HttpTransport| Thrift    | http | 二进制      |不支持               | 不支持      |  支持    | 不支持  |  不支持      |
|GRPC                       | PB        | http2| 二进制      |gzip/zlib/lz4/snappy| 支持        |  不支持  | 支持    |  支持       |
|BRPC Std                   | PB        | tcp  | 二进制      |gzip/zlib/lz4/snappy| 支持        |  不支持  | 支持    |  支持       |
|SRPC Std               | PB/Thrift | tcp  | 二进制/JSON |gzip/zlib/lz4/snappy| 支持        |  支持    | 支持    |  不支持     |
|SRPC Std Http          | PB/Thrift | http | 二进制/JSON |gzip/zlib/lz4/snappy| 支持        |  支持    | 支持    |  不支持     |

## 基础概念
- 通信层:TCP/TPC_SSL/HTTP/HTTPS/HTTP2
- 协议层:Thrift-binary/BRPC-std/SRPC-std/SRPC-http/tRPC-std/tRPC-http
- 压缩层:不压缩/gzip/zlib/lz4/snappy
- 数据层:PB binary/Thrift binary/Json string
- IDL序列化层:PB/Thrift serialization
- RPC调用层:Service/Client IMPL

## RPC Global
- 获取srpc版本号``srpc::SRPCGlobal::get_instance()->get_srpc_version()``

## RPC Status Code
|name                               | value     |含义               |
|-----------------------------------|-----------|-------------------|
|RPCStatusUndefined                 | 0         | 未定义             |
|RPCStatusOK                        | 1         | 正确/成功          |
|RPCStatusServiceNotFound           | 2         | 找不到Service名    |
|RPCStatusMethodNotFound            | 3         | 找不到RPC函数名    |
|RPCStatusMetaError                 | 4         | Meta错误/解析失败  |
|RPCStatusReqCompressSizeInvalid    | 5         | 请求压缩大小错误   |
|RPCStatusReqDecompressSizeInvalid  | 6         | 请求解压大小错误   |
|RPCStatusReqCompressNotSupported   | 7         | 请求压缩类型不支持 |
|RPCStatusReqDecompressNotSupported | 8         | 请求解压类型不支持 |
|RPCStatusReqCompressError          | 9         | 请求压缩失败       |
|RPCStatusReqDecompressError        | 10        | 请求解压失败       |
|RPCStatusReqSerializeError         | 11        | 请求IDL序列化失败  |
|RPCStatusReqDeserializeError       | 12        | 请求IDL反序列化失败|
|RPCStatusRespCompressSizeInvalid   | 13        | 回复压缩大小错误   |
|RPCStatusRespDecompressSizeInvalid | 14        | 回复解压大小错误   |
|RPCStatusRespCompressNotSupported  | 15        | 回复压缩类型不支持 |
|RPCStatusRespDecompressNotSupported| 16        | 回复解压类型不支持 |
|RPCStatusRespCompressError         | 17        | 回复压缩失败       |
|RPCStatusRespDecompressError       | 18        | 回复解压失败       |
|RPCStatusRespSerializeError        | 19        | 回复IDL序列化失败  |
|RPCStatusRespDeserializeError      | 20        | 回复IDL反序列化失败|
|RPCStatusIDLSerializeNotSupported  | 21        | 不支持IDL序列化   |
|RPCStatusIDLDeserializeNotSupported| 22        | 不支持IDL反序列化 |
|RPCStatusURIInvalid                | 30        | URI非法          |
|RPCStatusUpstreamFailed            | 31        | Upstream全熔断   |
|RPCStatusSystemError               | 100       | 系统错误         |
|RPCStatusSSLError                  | 101       | SSL错误          |
|RPCStatusDNSError                  | 102       | DNS错误          |
|RPCStatusProcessTerminated         | 103       | 程序退出&终止     |

## RPC IDL
- 描述文件
- 前后兼容
- Protobuf/Thrift

### 示例
下面我们通过一个具体例子来呈现
- 我们拿pb举例,定义一个ServiceName为``Example``的``example.proto``文件
- rpc接口名为``Echo``,输入参数为``EchoRequest``,输出参数为``EchoResponse``
- ``EchoRequest``包括两个string:``message``和``name``
- ``EchoResponse``包括一个string:``message``

~~~proto
syntax="proto2";

message EchoRequest {
    optional string message = 1;
    optional string name = 2;
};

message EchoResponse {
    optional string message = 1;
};

service Example {
     rpc Echo(EchoRequest) returns (EchoResponse);
};
~~~

## RPC Service
- 组成SRPC服务的基本单元
- 每一个Service一定由某一种IDL生成
- Service由IDL决定,与网络通信具体协议无关

### 示例
下面我们通过一个具体例子来呈现
- 沿用上面的``example.proto``IDL描述文件
- 执行官方的``protoc example.proto --cpp_out=./ --proto_path=./``获得``example.pb.h``和``example.pb.cpp``两个文件
- 执行SRPC的``srpc_generator protobuf ./example.proto ./``获得``example.srpc.h``文件
- 我们派生``Example::Service``来实现具体的rpc业务逻辑,这就是一个RPC Service
- 注意这个Service没有任何网络、端口、通信协议等概念,仅仅负责完成实现从``EchoRequest``输入到输出``EchoResponse``的业务逻辑

~~~cpp
class ExampleServiceImpl : public Example::Service
{
public:
    void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
    {
        response->set_message("Hi, " + request->name());

        printf("get_req:\n%s\nset_resp:\n%s\n",
                request->DebugString().c_str(),
                response->DebugString().c_str());
    }
};
~~~

## RPC Server
- 每一个Server对应一个端口
- 每一个Server对应一个确定的网络通信协议
- 一个Service可以添加到任意的Server里
- 一个Server可以拥有任意多个Service,但在当前Server里ServiceName必须唯一
- 不同IDL的Service是可以放进同一个Server中的

### 示例
下面我们通过一个具体例子来呈现
- 沿用上面的``ExampleServiceImpl``Service
- 首先,我们创建1个RPC Server,并确定proto文件的内容
- 然后,我们可以创建任意个数的Service实例、任意不同IDL proto形成的Service,把这些Service通过``add_service()``接口添加到Server里
- 最后,通过Server的``start()``或者``serve()``开启服务,处理即将到来的rpc请求

- 想像一下,我们也可以从``Example::Service``派生出多个Service,而它们的rpc``Echo``实现的功能可以不同
- 想像一下,我们可以在N个不同的端口创建N个不同的RPC Server,代表着不同的协议
- 想像一下,我们可以把同一个ServiceIMPL实例``add_service()``到不同的Server上,我们也可以把不同的ServiceIMPL实例``add_service``到同一个Server上
- 想像一下,我们可以用同一个``ExampleServiceImpl``,在三个不同端口、同时服务于BRPC-STD、SRPC-STD、SRPC-Http
- 甚至,我们可以将1个Protobuf IDL相关的``ExampleServiceImpl``和1个Thrift IDL相关的``AnotherThriftServiceImpl``,``add_service``到同一个SRPC-STD Server,两种IDL在同一个端口上完美工作!

~~~cpp
int main()
{
    SRPCServer server_srpc;
    SRPCHttpServer server_srpc_http;
    BRPCServer server_brpc;
    ThriftServer server_thrift;
    TRPCServer server_trpc;
    TRPCHttpServer server_trpc_http;

    ExampleServiceImpl impl_pb;
    AnotherThriftServiceImpl impl_thrift;

    server_srpc.add_service(&impl_pb);
    server_srpc.add_service(&impl_thrift);
    server_srpc_http.add_service(&impl_pb);
    server_srpc_http.add_service(&impl_thrift);
    server_brpc.add_service(&impl_pb);
    server_thrift.add_service(&impl_thrift);
    server_trpc.add_service(&impl_pb);
    server_trpc_http.add_service(&impl_pb);

    server_srpc.start(1412);
    server_srpc_http.start(8811);
    server_brpc.start(2020);
    server_thrift.start(9090);
    server_trpc.start(2022);
    server_trpc_http.start(8822);

    getchar();
    server_trpc_http.stop();
    server_trpc.stop();
    server_thrift.stop();
    server_brpc.stop();
    server_srpc_http.stop();
    server_srpc.stop();

    return 0;
}
~~~

## RPC Client
- 每一个Client对应着一个确定的目标/一个确定的集群
- 每一个Client对应着一个确定的网络通信协议
- 每一个Client对应着一个确定的IDL

### 示例
下面我们通过一个具体例子来呈现
- 沿用上面的例子,client相对简单,直接调用即可
- 通过``Example::XXXClient``创建某种RPC的client实例,需要目标的ip+port或url
- 利用client实例直接调用rpc函数``Echo``即可,这是一次异步请求,请求完成后会进入回调函数
- 具体的RPC Context用法请看下一个段落: [RPC Context](/docs/rpc.md#rpc-context))

~~~cpp
#include <stdio.h>
#include "example.srpc.h"
#include "workflow/WFFacilities.h"

using namespace srpc;

int main()
{
    Example::SRPCClient client("127.0.0.1", 1412);
    EchoRequest req;
    req.set_message("Hello!");
    req.set_name("SRPCClient");

    WFFacilities::WaitGroup wait_group(1);

    client.Echo(&req, [&wait_group](EchoResponse *response, RPCContext *ctx) {
        if (ctx->success())
            printf("%s\n", response->DebugString().c_str());
        else
            printf("status[%d] error[%d] errmsg:%s\n",
                    ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
        wait_group.done();
    });

    wait_group.wait();
    return 0;
}
~~~

## RPC Context
- RPCContext专门用来辅助异步接口,Service和Client通用
- 每一个异步接口都会提供Context,用来给用户提供更高级的功能,比如获取对方ip、获取连接seqid等
- Context上一些功能是Server或Client独有的,比如Server可以设置回复数据的压缩方式,Client可以获取请求成功或失败
- Context上可以通过``get_series()``获得所在的series,与workflow的异步模式无缝结合

### RPCContext API - Common
#### ``long long get_seqid() const;``
请求+回复视为1次完整通信,获得当前socket连接上的通信sequence id,seqid=0代表第1次

#### ``std::string get_remote_ip() const;``
获得对方IP地址,支持ipv4/ipv6

#### ``int get_peer_addr(struct sockaddr *addr, socklen_t *addrlen) const;``
获得对方地址,in/out参数为更底层的数据结构sockaddr

#### ``const std::string& get_service_name() const;``
获取RPC Service Name

#### ``const std::string& get_method_name() const;``
获取RPC Methode Name

#### ``SeriesWork *get_series() const;``
获取当前ServerTask/ClientTask所在series

#### ``bool get_http_header(const std::string& name, std::string& value);``  
如果通讯使用HTTP协议,则根据name获取HTTP header中的value

### RPCContext API - Only for client done
#### ``bool success() const;``
client专用。这次请求是否成功

#### ``int get_status_code() const;``
client专用。这次请求的rpc status code

#### ``const char *get_errmsg() const;``
client专用。这次请求的错误信息

#### ``int get_error() const;``
client专用。这次请求的错误码

#### ``void *get_user_data() const;``
client专用。获取ClientTask的user_data。如果用户通过``create_xxx_task()``接口产生task,则可以通过user_data域记录上下文,在创建task时设置,在回调函数中拿回。

### RPCContext API - Only for server process
#### ``void set_data_type(RPCDataType type);``
Server专用。设置数据打包类型
- RPCDataProtobuf
- RPCDataThrift
- RPCDataJson

#### ``void set_compress_type(RPCCompressType type);``
Server专用。设置数据压缩类型(注:Client的压缩类型在Client或Task上设置)
- RPCCompressNone
- RPCCompressSnappy
- RPCCompressGzip
- RPCCompressZlib
- RPCCompressLz4

#### ``void set_attachment_nocopy(const char *attachment, size_t len);``
Server专用。设置attachment附件。

#### ``bool get_attachment(const char **attachment, size_t *len) const;``
Server专用。获取attachment附件。

#### ``void set_reply_callback(std::function<void (RPCContext *ctx)> cb);``
Server专用。设置reply callback,操作系统写入socket缓冲区成功后被调用。

#### ``void set_send_timeout(int timeout);``
Server专用。设置发送超时,单位毫秒。-1代表无限。

#### ``void set_keep_alive(int timeout);``
Server专用。设置连接保活时间,单位毫秒。-1代表无限。

#### ``bool set_http_code(int code);``   
Server专用。如果通讯使用HTTP协议,则可以设置http status code返回码。仅在框架层能正确响应时有效。

#### ``bool set_http_header(const std::string& name, const std::string& value);``    
Server专用。如果通讯使用HTTP协议,可以在回复中设置HTTP header,如果name被设置过会覆盖旧value。

#### ``bool add_http_header(const std::string& name, const std::string& value);``    
Server专用。如果通讯使用HTTP协议,可以在回复中添加HTTP header,如果有重复name,会保留多个value。

#### ``void log(const RPCLogVector& fields);``
Server专用。透传数据相关,请参考OpenTelemetry数据协议中的log语义。

#### ``void baggage(const std::string& key, const std::string& value);``
Server专用。透传数据相关,参考OpenTelemetry数据协议中的baggage语义。

#### ``void set_json_add_whitespace(bool on);``
Server专用。JsonPrintOptions相关,可设置增加json空格等。

#### ``void set_json_always_print_enums_as_ints(bool flag);``
Server专用。JsonPrintOptions相关,可设置用int打印enum名。

#### ``void set_json_preserve_proto_field_names(bool flag);``
Server专用。JsonPrintOptions相关,可设置保留原始字段名字。

#### ``void set_json_always_print_fields_with_no_presence(bool flag);``
Server专用。JsonPrintOptions相关,可设置带上所有默认的proto数据中的域。

## RPC Options
### Server Params
|name                       |默认                      |含义                             |
|---------------------------|--------------------------|--------------------------------|
|max_connections            | 2000                     | Server的最大连接数,默认2000个   |
|peer_response_timeout      | 10 * 1000                | 每一次IO的读超时,默认10秒       |
|receive_timeout            | -1                       | 每一条完整消息的读超时,默认无限  |
|keep_alive_timeout         | 60 * 1000                | 空闲连接保活,-1代表永远不断开,0代表短连接,默认长连接保活60秒 |
|request_size_limit         | 2LL * 1024 * 1024 * 1024 | 请求包大小限制,最大2GB           |
|ssl_accept_timeout         | 10 * 1000                | SSL连接超时,默认10秒            |

### Client Params
|name                       |默认                      |含义                             |
|---------------------------|--------------------------|--------------------------------|
|host                       | ""                       | 目标host,可以是ip、域名         |
|port                       | 1412                     | 目标端口号,默认1412            |
|is_ssl                     | false                    | ssl开关,默认关闭               |
|url                        | ""                       | 当host为空,url设置才有效。url将屏蔽host/port/is_ssl三项 |
|task_params                | TASK默认配置              | 见下方                         |

### Task Params
|name                       |默认                      |含义                             |
|---------------------------|--------------------------|--------------------------------|
|send_timeout               | -1                       | 发送写超时,默认无限             |
|receive_timeout            | -1                       | 回复超时,默认无限             |
|watch_timeout              | 0                        | 对方第一次回复的超时,默认0不设置 |
|keep_alive_timeout         | 30 * 1000                | 空闲连接保活,-1代表永远不断开,默认30s |
|retry_max                  | 0                        | 最大重试次数,默认0不重试        |
|compress_type              | RPCCompressNone          | 压缩类型,默认不压缩             |
|data_type                  | RPCDataUndefined         | 网络包数据类型,默认与RPC默认值一致,SRPC-Http协议为json,其余为对应IDL的类型 |

## 与workflow异步框架的结合
### 1. Server
下面我们通过一个具体例子来呈现
- Echo RPC在接收到请求时,向下游发起一次http请求
- 对下游请求完成后,我们将http response的body信息填充到response的message里,回复给客户端
- 我们不希望阻塞/占据着Handler的线程,所以对下游的请求一定是一次异步请求
- 首先,我们通过Workflow框架的工厂``WFTaskFactory::create_http_task``创建一个异步任务http_task
- 然后,我们利用RPCContext的``ctx->get_series()``获取到ServerTask所在的SeriesWork
- 最后,我们使用SeriesWork的``push_back``接口将http_task放到SeriesWork的后面

~~~cpp
class ExampleServiceImpl : public Example::Service
{
public:
    void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
    {
        auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0,
            [request, response](WFHttpTask *task) {
                if (task->get_state() == WFT_STATE_SUCCESS)
                {
                    const void *data;
                    size_t len;
                    task->get_resp()->get_parsed_body(&data, &len);
                    response->mutable_message()->assign((const char *)data, len);
                }
                else
                    response->set_message("Error: " + std::to_string(task->get_error()));

                printf("Server Echo()\nget_req:\n%s\nset_resp:\n%s\n",
                                            request->DebugString().c_str(),
                                            response->DebugString().c_str());
            });

        ctx->get_series()->push_back(http_task);
    }
};
~~~

### 2. Client
下面我们通过一个具体例子来呈现
- 我们并行发出两个请求,1个是rpc请求,1个是http请求
- 两个请求都结束后,我们再发起一次计算任务,计算两个数的平方和
- 首先,我们通过RPC Client的``create_Echo_task``创建一个rpc异步请求的网络任务rpc_task
- 然后,我们通过Workflow框架的工厂``WFTaskFactory::create_http_task``和``WFTaskFactory::create_go_task``分别创建异步网络任务http_task,和异步计算任务calc_task
- 最后,我们利用串并联流程图,乘号代表并行、大于号代表串行,将3个异步任务组合起来执行``start()``

~~~cpp
void calc(int x, int y)
{
    int z = x * x + y * y;

    printf("calc result: %d\n", z);
}

int main()
{
    Example::SRPCClient client("127.0.0.1", 1412);

    auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
        if (ctx->success())
            printf("%s\n", response->DebugString().c_str());
        else
            printf("status[%d] error[%d] errmsg:%s\n",
                    ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
    });

    auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
        if (task->get_state() == WFT_STATE_SUCCESS)
        {
            std::string body;
            const void *data;
            size_t len;

            task->get_resp()->get_parsed_body(&data, &len);
            body.assign((const char *)data, len);
            printf("%s\n\n", body.c_str());
        }
        else
            printf("Http request fail\n\n");
    });

    auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);

    EchoRequest req;
    req.set_message("Hello!");
    req.set_name("1412");
    rpc_task->serialize_input(&req);

    WFFacilities::WaitGroup wait_group(1);

    SeriesWork *series = Workflow::create_series_work(http_task, [&wait_group](const SeriesWork *) {
        wait_group.done();
    });
    series->push_back(rpc_task);
    series->push_back(calc_task);
    series->start();

    wait_group.wait();
    return 0;
}
~~~

### 3. Upstream
SRPC可以直接使用Workflow的任何组件,最常用的就是[Upstream](https://github.com/sogou/workflow/blob/master/docs/about-upstream.md),SRPC的任何一种client都可以使用Upstream。

我们通过参数来看看如何构造可以使用Upstream的client:

```cpp
#include "workflow/UpstreamManager.h"

int main()
{
    // 1. 创建upstream并添加实例
    UpstreamManager::upstream_create_weighted_random("echo_server", true);
    UpstreamManager::upstream_add_server("echo_server", "127.0.0.1:1412");
    UpstreamManager::upstream_add_server("echo_server", "192.168.10.10");
    UpstreamManager::upstream_add_server("echo_server", "internal.host.com");

    // 2. 构造参数,填上upstream的名字
    RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
    client_params.host = "srpc::echo_server"; // 这个scheme只用于upstream URI解析
    client_params.port = 1412; // 这个port只用于upstream URI解析,不影响具体实例的选取

    // 3. 用参数创建client,其他用法与示例类似
    Example::SRPCClient client(&client_params);

    ...
```

如果使用了ConsistentHash或者Manual方式创建upstream,则我们往往需要对不同的task进行区分、以供选取算法使用。这时候可以使用client task上的`int set_uri_fragment(const std::string& fragment);`接口,设置请求级相关的信息。

这个域的是URI里的fragment,语义请参考[RFC3689 3.5-Fragment](https://datatracker.ietf.org/doc/html/rfc3986#section-3.5),任何需要用到fragment的功能(如其他选取策略里附带的其他信息),都可以利用这个域。