1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
|
# 异步MySQL客户端:mysql_cli
# 示例代码
[tutorial-12-mysql_cli.cc](/tutorial/tutorial-12-mysql_cli.cc)
# 关于mysql_cli
教程中的mysql_cli使用方式与官方客户端相似,是一个命令行交互式的异步MySQL客户端。
程序运行方式:./mysql_cli \<URL\>
启动之后可以直接在终端输入mysql命令与db进行交互,输入quit或Ctrl-C退出。
# MySQL URL的格式
mysql://username:password@host:port/dbname?character_set=charset&character_set_results=charset
- 如果以SSL连接访问MySQL,则scheme设为**mysqls://**。MySQL server 5.7及以上支持;
- username和password按需填写,如果密码里包含特殊字符,需要转义后再拼接URL;
~~~cpp
// 密码为:@@@@####
std::string url = "mysql://root:" + StringUtil::url_encode_component("@@@@####") + "@127.0.0.1";
~~~
- port默认为3306;
- dbname为要用的数据库名,一般如果SQL语句只操作一个db的话建议填写;
- 如果用户在这一层有upstream选取需求,可以参考[upstream文档](/docs/about-upstream.md);
- character_set为client的字符集,等价于使用官方客户端启动时的参数``--default-character-set``的配置,默认utf8,具体可以参考MySQL官方文档[character-set.html](https://dev.mysql.com/doc/internals/en/character-set.html)。
- character_set_results为client、connection和results的字符集,如果想要在SQL语句里使用``SET NAME``来指定这些字符集的话,请把它配置到url的这个位置。
MySQL URL示例:
mysql://root:password@127.0.0.1
mysql://@test.mysql.com:3306/db1?character_set=utf8&character_set_results=utf8
mysqls://localhost/db1?character\_set=big5
# 创建并启动MySQL任务
用户可以使用WFTaskFactory创建MySQL任务,创建接口与回调函数的用法都与workflow其他任务类似:
~~~cpp
using mysql_callback_t = std::function<void (WFMySQLTask *)>;
WFMySQLTask *create_mysql_task(const std::string& url, int retry_max, mysql_callback_t callback);
void set_query(const std::string& query);
~~~
用户创建完WFMySQLTask之后,可以对req调用 **set_query()** 写入SQL语句。
如果没调用过 **set_query()** ,task就被start起来的话,则用户会在callback里得到**WFT_ERR_MYSQL_QUERY_NOT_SET**。
其他包括callback、series、user_data等与workflow其他task用法类似。
大致使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFMySQLTask *task = WFTaskFactory::create_mysql_task(url, RETRY_MAX, mysql_callback);
task->get_req()->set_query("SHOW TABLES;");
...
task->start();
...
}
~~~
# 支持的命令
目前支持的命令为**COM_QUERY**,已经能涵盖用户基本的增删改查、建库删库、建表删表、预处理、使用存储过程和使用事务的需求。
因为我们的交互命令中不支持选库(**USE**命令),所以,如果SQL语句中有涉及到**跨库**的操作,则可以通过**db_name.table_name**的方式指定具体哪个库的哪张表。
其他所有命令都可以**拼接**到一起通过 ``set_query()`` 传给WFMySQLTask(包括INSERT/UPDATE/SELECT/DELETE/PREPARE/CALL)。
拼接的命令会被按序执行直到命令发生错误,前面的命令会执行成功。
举个例子:
~~~cpp
req->set_query("SELECT * FROM table1; CALL procedure1(); INSERT INTO table3 (id) VALUES (1);");
~~~
# 结果解析
与workflow其他任务类似,可以用``task->get_resp()``拿到**MySQLResponse**,我们可以通过**MySQLResultCursor**遍历结果集。具体接口可以查看:[MySQLResult.h](/src/protocol/MySQLResult.h)
一次请求所对应的回复中,其数据是一个三维结构:
- 一个回复中包含了一个或多个结果集(result set);
- 一个结果集的类型可能是**MYSQL_STATUS_GET_RESULT**或者**MYSQL_STATUS_OK**;
- **MYSQL_STATUS_GET_RESULT**类型的结果集包含了一行或多行(row);
- 一行包含了一列或多个列,或者说一到多个阈(Field/Cell),具体数据结构为**MySQLField**和**MySQLCell**;
结果集的两种类型,可以通过``cursor->get_cursor_status()``进行判断:
| |MYSQL_STATUS_GET_RESULT|MYSQL_STATUS_OK|
|------|-----------------------|---------------|
|SQL命令|SELECT(包括存储过程中的每一个SELECT)|INSERT / UPDATE / DELETE / ...|
|对应语义|读操作,一个结果集表示一份读操作返回的二维表|写操作,一个结果集表示一个写操作是否成功|
|主要接口|fetch_fields();</br>fetch_row(&row_arr);</br>...|get_insert_id();</br>get_affected_rows();</br>...|
由于拼接语句可能存在错误,因此这种情况,可以通过**MySQLResultCursor**拿到前面正确执行过的语句多个结果集,以及最后判断``resp->get_packet_type()``为**MYSQL_PACKET_ERROR**时,通过``resp->get_error_code()``和``resp->get_error_msg()``拿到具体错误信息。
一个包含n条**SELECT**语句的**存储过程**,会返回n个**MYSQL_STATUS_GET_RESULT**的结果集和1个**MYSQL_STATUS_OK**的结果集,用户自行忽略此**MYSQL_STATUS_OK**结果集即可。
具体使用从外到内的步骤应该是:
1. 判断任务状态(代表通信层面状态):用户通过判断 ``task->get_state()`` 等于**WFT_STATE_SUCCESS**来查看任务执行是否成功;
2. 判断回复包类型(代表返回包解析状态):调用 **resp->get_packet_type()** 查看最后一条MySQL语句的返回包类型,常见的几个类型为:
- MYSQL_PACKET_OK:成功,可以用cursor遍历结果;
- MYSQL_PACKET_EOF:成功,可以用cursor遍历结果;
- MYSQL_PACKET_ERROR:失败或部分失败,成功的部分可以用cursor遍历结果;
3. 遍历结果集。用户可以使用**MySQLResultCursor**读取结果集中的内容,因为MySQL server返回的数据是多结果集的,因此一开始cursor会**自动指向第一个结果集**的读取位置。
4. 判断结果集状态(代表结果集读取状态):通过 ``cursor->get_cursor_status()`` 可以拿到的几种状态:
- MYSQL_STATUS_GET_RESULT:此结果集为读请求类型;
- MYSQL_STATUS_END:读结果集已读完最后一行;
- MYSQL_STATUS_OK:此结果集为写请求类型;
- MYSQL_STATUS_ERROR:解析错误;
5. 读取**MYSQL_STATUS_OK**结果集中的基本内容:
- ``unsigned long long get_affected_rows() const;``
- ``unsigned long long get_insert_id() const;``
- ``int get_warnings() const;``
- ``std::string get_info() const;``
6. 读取**MYSQL_STATUS_GET_RESULT**结果集中的columns中每个field:
- ``int get_field_count() const;``
- ``const MySQLField *fetch_field();``
- ``const MySQLField *const *fetch_fields() const;``
7. 读取**MYSQL_STATUS_GET_RESULT**结果集中的每一行:按行读取可以使用 ``cursor->fetch_row()`` 直到返回值为false。其中会移动cursor内部对当前结果集的指向每行的offset:
- ``int get_rows_count() const;``
- ``bool fetch_row(std::vector<MySQLCell>& row_arr);``
- ``bool fetch_row(std::map<std::string, MySQLCell>& row_map);``
- ``bool fetch_row(std::unordered_map<std::string, MySQLCell>& row_map);``
- ``bool fetch_row_nocopy(const void **data, size_t *len, int *data_type);``
8. 直接把当前**MYSQL_STATUS_GET_RESULT**结果集的所有行拿出:所有行的读取可以使用 **cursor->fetch_all()** ,内部用来记录行的cursor会直接移动到最后;当前cursor状态会变成**MYSQL_STATUS_END**:
- ``bool fetch_all(std::vector<std::vector<MySQLCell>>& rows);``
9. 返回当前**MYSQL_STATUS_GET_RESULT**结果集的头部:如果有必要重读这个结果集,可以使用 **cursor->rewind()** 回到当前结果集头部,再通过第7步或第8步进行读取;
10. 拿到下一个结果集:因为MySQL server返回的数据包可能是包含多结果集的(比如每个select/insert/...语句为一个结果集;或者call procedure返回的多结果集数据),因此用户可以通过 **cursor->next_result_set()** 跳到下一个结果集,返回值为false表示所有结果集已取完。
11. 返回第一个结果集:**cursor->first_result_set()** 可以让我们返回到所有结果集的头部,然后可以从第4步开始重新拿数据;
12. **MYSQL_STATUS_GET_RESULT**结果集每列具体数据MySQLCell:第7步中读取到的一行,由多列组成,每列结果为MySQLCell,基本使用接口有:
- ``int get_data_type();`` 返回MYSQL_TYPE_LONG、MYSQL_TYPE_STRING...具体参考[mysql_types.h](/src/protocol/mysql_types.h)
- ``bool is_TYPE() const;`` TYPE为int、string、ulonglong,判断是否是某种类型
- ``TYPE as_TYPE() const;`` 同上,以某种类型读出MySQLCell的数据
- ``void get_cell_nocopy(const void **data, size_t *len, int *data_type) const;`` nocopy接口
整体示例如下:
~~~cpp
void task_callback(WFMySQLTask *task)
{
// step-1. 判断任务状态
if (task->get_state() != WFT_STATE_SUCCESS)
{
fprintf(stderr, "task error = %d\n", task->get_error());
return;
}
MySQLResultCursor cursor(task->get_resp());
bool test_first_result_set_flag = false;
bool test_rewind_flag = false;
// step-2. 判断回复包其他状态
if (resp->get_packet_type() == MYSQL_PACKET_ERROR)
{
fprintf(stderr, "ERROR. error_code=%d %s\n",
task->get_resp()->get_error_code(),
task->get_resp()->get_error_msg().c_str());
}
begin:
// step-3. 遍历结果集
do {
// step-4. 判断结果集状态
if (cursor.get_cursor_status() == MYSQL_STATUS_OK)
{
// step-5. MYSQL_STATUS_OK结果集的基本内容
fprintf(stderr, "OK. %llu rows affected. %d warnings. insert_id=%llu.\n",
cursor.get_affected_rows(), cursor.get_warnings(), cursor.get_insert_id());
}
else if (cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT)
{
fprintf(stderr, "field_count=%u rows_count=%u ",
cursor.get_field_count(), cursor.get_rows_count());
// step-6. 读取每个fields。这是个nocopy api
const MySQLField *const *fields = cursor.fetch_fields();
for (int i = 0; i < cursor.get_field_count(); i++)
{
fprintf(stderr, "db=%s table=%s name[%s] type[%s]\n",
fields[i]->get_db().c_str(), fields[i]->get_table().c_str(),
fields[i]->get_name().c_str(), datatype2str(fields[i]->get_data_type()));
}
// step-8. 把所有行读出,也可以while (cursor.fetch_row(map/vector)) 按step-7拿每一行
std::vector<std::vector<MySQLCell>> rows;
cursor.fetch_all(rows);
for (unsigned int j = 0; j < rows.size(); j++)
{
// step-12. 具体每个cell的读取
for (unsigned int i = 0; i < rows[j].size(); i++)
{
fprintf(stderr, "[%s][%s]", fields[i]->get_name().c_str(),
datatype2str(rows[j][i].get_data_type()));
// step-12. 判断具体类型is_string()和转换具体类型as_string()
if (rows[j][i].is_string())
{
std::string res = rows[j][i].as_string();
fprintf(stderr, "[%s]\n", res.c_str());
}
else if (rows[j][i].is_int())
{
fprintf(stderr, "[%d]\n", rows[j][i].as_int());
} // else if ...
}
}
}
// step-10. 拿下一个结果集
} while (cursor.next_result_set());
if (test_first_result_set_flag == false)
{
test_first_result_set_flag = true;
// step-11. 返回第一个结果集
cursor.first_result_set();
goto begin;
}
if (test_rewind_flag == false)
{
test_rewind_flag = true;
// step-9. 返回当前结果集头部
cursor.rewind();
goto begin;
}
return;
}
~~~
# WFMySQLConnection
由于我们是高并发异步客户端,这意味着我们对一个server的连接可能会不止一个。而MySQL的事务和预处理都是带状态的,为了保证一次事务或预处理独占一个连接,用户可以使用我们封装的二级工厂WFMySQLConnection来创建任务,每个WFMySQLConnection保证独占一个连接,具体参考[WFMySQLConnection.h](/src/client/WFMySQLConnection.h)。
### 1. WFMySQLConnection的创建与初始化
创建一个WFMySQLConnection的时候需要传入一个**id**,之后的调用内部都会由这个id和url去找到对应的那个连接。
初始化需要传入**url**,之后在这个connection上创建的任务就不需要再设置url了。
~~~cpp
class WFMySQLConnection
{
public:
WFMySQLConnection(int id);
int init(const std::string& url);
...
};
~~~
### 2. 创建任务与关闭连接
通过 **create_query_task()** ,写入SQL请求和回调函数即可创建任务,该任务一定从这一个connection发出。
有时候我们需要手动关闭这个连接。因为当我们不再使用它的时候,这个连接会一直保持到MySQL server超时。期间如果使用同一个id和url去创建WFMySQLConnection的话就可以复用到这个连接。
因此我们建议如果不准备复用连接,应使用 **create_disconnect_task()** 创建一个任务,手动关闭这个连接。
~~~cpp
class WFMySQLConnection
{
public:
...
WFMySQLTask *create_query_task(const std::string& query,
mysql_callback_t callback);
WFMySQLTask *create_disconnect_task(mysql_callback_t callback);
}
~~~
WFMySQLConnection相当于一个二级工厂,我们约定任何工厂对象的生命周期无需保持到任务结束,以下代码完全合法:
~~~cpp
WFMySQLConnection *conn = new WFMySQLConnection(1234);
conn->init(url);
auto *task = conn->create_query_task("SELECT * from table", my_callback);
conn->deinit();
delete conn;
task->start();
~~~
### 3. 注意事项
不可以无限制的产生id来生成连接对象,因为每个id会占用一小块内存,无限产生id会使内存不断增加。当一个连接使用完毕,可以不创建和运行disconnect task,而是让这个连接进入内部连接池。下一个connection通过相同的id和url初始化,会自动复用这个连接。
同一个连接上的多个任务并行启动,会得到EAGAIN错误。
如果在使用事务期间已经开始BEGIN但还没有COMMIT或ROLLBACK,且期间连接发生过中断,则连接会被框架内部自动重连,用户会在下一个task请求中拿到**ECONNRESET**错误。此时还没COMMIT的事务语句已经失效,需要重新再发一遍。
### 4. 预处理
用户也可以通过WFMySQLConnection来做预处理**PREPARE**,因此用户可以很方便地用作**防SQL注入**。如果连接发生了重连,也会得到一个**ECONNRESET**错误。
### 5. 完整示例
~~~cpp
WFMySQLConnection conn(1);
conn.init("mysql://root@127.0.0.1/test");
// test transaction
const char *query = "BEGIN;";
WFMySQLTask *t1 = conn.create_query_task(query, task_callback);
query = "SELECT * FROM check_tiny FOR UPDATE;";
WFMySQLTask *t2 = conn.create_query_task(query, task_callback);
query = "INSERT INTO check_tiny VALUES (8);";
WFMySQLTask *t3 = conn.create_query_task(query, task_callback);
query = "COMMIT;";
WFMySQLTask *t4 = conn.create_query_task(query, task_callback);
WFMySQLTask *t5 = conn.create_disconnect_task(task_callback);
((*t1) > t2 > t3 > t4 > t5).start();
~~~
|