1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
|
# 异步Kafka客户端:kafka_cli
# 示例代码
[tutorial-13-kafka_cli.cc](/tutorial/tutorial-13-kafka_cli.cc)
# 编译
由于支持Kafka的多种压缩方式,因此系统需要预先安装[zlib](https://github.com/madler/zlib.git),[snappy](https://github.com/google/snappy.git),[lz4(>=1.7.5)](https://github.com/lz4/lz4.git),[zstd](https://github.com/facebook/zstd.git)等第三方库。
支持CMake和Bazel两种编译方式。
CMake:执行命令make KAFKA=y 编译独立的类库(libwfkafka.a和libwfkafka.so)支持kafka协议;cd tutorial; make KAFKA=y 可以编译kafka_cli
Bazel:执行bazel build kafka 编译支持kafka协议的类库;执行bazel build kafka_cli 编译kafka_cli
# 关于kafka_cli
这是一个kafka client,可以完成kafka的消息生产(produce)和消息消费(fetch)。
编译时需要在tutorial目录中执行编译命令make KAFKA=y或者在项目根目录执行make KAFKA=y tutorial。
该程序从命令行读取一个kafka broker服务器地址和本次任务的类型(produce/fetch):
./kafka_cli \<broker_url\> [p/c]
程序会在执行完任务后自动退出,一切资源完全回收。
其中broker_url可以有多个url组成,多个url之间以,分割
- 形式如:kafka://host:port,kafka://host1:port... 或:**kafkas**://host:port,**kafkas**://host1:port代表使用SSL通信。
- port的默认值在普通TCP连接下是9092,SSL下为9093。
- "kafka://"前缀可以缺省。这时候使用默认使用TCL通信。
- 多个url,必须都采用TCP或都采用SSL。否则init函数返回-1,错误码为EINVAL。
- 如果用户在这一层有upstream选取需求,可以参考[upstream文档](../docs/about-upstream.md)。
Kafka broker_url示例:
kafka://127.0.0.1/
kafka://kafka.host:9090/
kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou
kafkas://broker1.kafka.sogou,kafkas://broker2.kafka.sogou
错误的url示例(第一个broker为SSL,第二个broker非SSL):
kafkas://broker1.kafka.sogou,broker2.kafka.sogou
# 实现原理和特性
kafka client内部实现上除了压缩功能外没有依赖第三方库,同时利用了workflow的高性能,在合理的配置和环境下,每秒钟可以处理几万次Kafka请求。
在内部实现上,kafka client会把一次请求按照内部使用到的broker分拆成并行parallel任务,每个broker地址对应parallel任务中的一个子任务,
这样可以最大限度的提升效率,同时利用workflow内部对连接的复用机制使得整体的连接数控制在一个合理的范围。
如果一个broker地址下有多个topic partition,为了提高吞吐,应该创建多个client,然后按照topic partition分别创建任务独立启动。
# 创建并启动Kafka任务
首先需要创建一个WFKafkaClient对象,然后调用init函数初始化WFKafkaClient对象,
~~~cpp
int init(const std::string& broker_url);
int init(const std::string& broker_url, const std::string& group);
~~~
其中broker_url是kafka broker集群的地址,格式可以参考上面的broker_url,
group是消费者组的group_name,用在基于消费者组的fetch任务中,如果是produce任务或者没有使用消费者组的fetch任务,则不需要使用此接口;
用消费者组的时候,可以设置heartbeat的间隔时间,时间单位是毫秒,用于维持心跳:
~~~cpp
void set_heartbeat_interval(size_t interval_ms);
~~~
后面再通过WFKafkaClient对象创建kafka任务
~~~cpp
using kafka_callback_t = std::function<void (WFKafkaTask *)>;
WFKafkaTask *create_kafka_task(const std::string& query, int retry_max, kafka_callback_t cb);
WFKafkaTask *create_kafka_task(int retry_max, kafka_callback_t cb);
~~~
其中query中包含此次任务的类型以及topic等属性,retry_max表示最大重试次数,cb为用户自定义的callback函数,当task执行完毕后会被调用,
接着还可以修改task的默认配置以满足实际需要,详细接口可以在[KafkaDataTypes.h](../src/protocol/KafkaDataTypes.h)中查看
~~~cpp
KafkaConfig config;
config.set_client_id("workflow");
task->set_config(std::move(config));
~~~
支持的配置选项描述如下:
配置名 | 类型 | 默认值 | 含义
------ | ---- | -------| -------
produce_timeout | int | 100ms | produce的超时时间
produce_msg_max_bytes | int | 1000000 bytes | 单个消息的最大长度限制
produce_msgset_cnt | int | int | 10000 | 一次通信消息集合的最大条数
produce_msgset_max_bytes | int | 1000000 bytes | 一次通信消息集合的最大长度限制
fetch_timeout | int | 100ms | fetch的超时时间
fetch_min_bytes | int | 1 byte | 一次fetch通信最小消息的长度
fetch_max_bytes | int | 50M bytes | 一次fetch通信最大消息的长度
fetch_msg_max_bytes | int | 1M bytes | 一次fetch通信单个消息的最大长度
offset_timestamp | long long int | -1 | 消费者组模式下,没有找到历史offset时,初始化的offset,-2表示最久,-1表示最新
session_timeout | int | 10s | 加入消费者组初始化时的超时时间
rebalance_timeout | int | 10s | 加入消费者组同步信息阶段的超时时间
produce_acks | int | -1 | produce任务在返回之前应确保消息成功复制的broker节点数,-1表示所有的复制broker节点
allow_auto_topic_creation | bool | true | produce时topic不存在时,是否自动创建topic
broker_version | char * | NULL | 指定broker的版本号,<0.10时需要手动指定
compress_type | int | NoCompress | produce消息的压缩类型
client_id | char * | NULL | 表示client的id
check_crcs | bool | false | fetch任务中是否校验消息的crc32
offset_store | int | 0 | 加入消费者组时,是否使用上次提交offset,1表示使用指定的offset,0表示优先使用上次提交
sasl_mechanisms | char * | NULL | sasl认证类型,目前支持plain和scram
sasl_username | char * | NULL | sasl认证所需的username
sasl_password | char * | NULL | sasl认证所需的password
最后就可以调用start接口启动kafka任务。
# produce任务
1、在创建并初始化WFKafkaClient之后,可以在query中直接指定topic等信息创建WFKafkaTask任务
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
...
task->start();
...
}
~~~
2、在创建完WFKafkaTask之后,先通过调用set_key, set_value, add_header_pair等方法构建KafkaRecord,
关于KafkaRecord的更多接口,可以在[KafkaDataTypes.h](../src/protocol/KafkaDataTypes.h)中查看
然后应该通过调用add_produce_record添加KafkaRecord,关于更多接口的详细定义,可以在[WFKafkaClient.h](../src/client/WFKafkaClient.h)中查看
需要注意的是,add_produce_record的第二个参数partition,当>=0是表示指定的partition,-1表示随机指定partition或者调用自定义的kafka_partitioner_t
kafka_partitioner_t可以通过set_partitioner接口设置自定义规则。
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url);
task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback);
task->set_partitioner(partitioner);
KafkaRecord record;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
...
task->start();
...
}
~~~
3、produce还可以使用kafka支持的4种压缩协议,通过设置配置项来实现
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url);
task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback);
KafkaConfig config;
config.set_compress_type(Kafka_Zstd);
task->set_config(std::move(config));
KafkaRecord record;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
...
task->start();
...
}
~~~
# fetch任务
fetch任务支持消费者组模式和手动模式
1、手动模式
无需指定消费者组,同时需要用户指定topic、partition和offset
使用示例如下:
~~~cpp
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch", 3, kafka_callback);
KafkaToppar toppar;
toppar.set_topic_partition("workflow_test1", 0);
toppar.set_offset(0);
task->add_toppar(toppar);
~~~
2、消费者组模式
在初始化client的时候需要指定消费者组的名称
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url, cgroup_name);
task = client_fetch->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
...
task->start();
...
}
~~~
3、offset的提交
在消费者组模式下,用户消费消息后,可以在callback函数中,通过创建commit任务来自动提交消费的记录,使用示例如下:
~~~cpp
void kafka_callback(WFKafkaTask *task)
{
...
commit_task = client.create_kafka_task("api=commit", 3, kafka_callback);
...
commit_task->start();
...
}
~~~
# 关于client的关闭
在消费者组模式下,client在关闭之前需要调用create_leavegroup_task创建leavegroup_task,
它会发送leavegroup协议包,如果没有启动leavegroup_task,会导致消费者组没有正确退出,触发这个组的rebalance。
# 处理kafka结果
消息的结果集的数据结构是KafkaResult,可以通过调用WFKafkaTask的get_result()接口获得,
然后调用KafkaResult的fetch_record接口可以将本次task相关的record取出来,它是一个KafkaRecord的二维vector,
第一维是topic partition,第二维是某个topic partition下对应的KafkaRecord,
在[KafkaResult.h](../src/protocol/KafkaResult.h)中可以看到KafkaResult的定义
~~~cpp
void kafka_callback(WFKafkaTask *task)
{
int state = task->get_state();
int error = task->get_error();
// handle error states
...
protocol::KafkaResult *result = task->get_result();
result->fetch_records(records);
for (auto &v : records)
{
for (auto &w: v)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
printf("produce\ttopic: %s, partition: %d, status: %d, offset: %lld, val_len: %zu\n",
w->get_topic(), w->get_partition(), w->get_status(), w->get_offset(), value_len);
}
}
...
protocol::KafkaResult new_result = std::move(*task->get_result());
if (new_result.fetch_records(records))
{
for (auto &v : records)
{
if (v.empty())
continue;
for (auto &w: v)
{
if (fp)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
fwrite(w->get_value(), w->get_value_len(), 1, fp);
}
}
}
}
...
}
~~~
# 认证
认证信息需要在配置中设置,以sasl为例:
~~~cpp
int main(int argc, char *argv[])
{
...
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
config.set_sasl_username("fetch");
config.set_sasl_password("fetch-secret");
config.set_sasl_mech("SCRAM-SHA-256");
task->set_config(std::move(config));
...
task->start();
...
}
~~~
|