File: tutorial-12-mysql_cli.md

package info (click to toggle)
workflow 0.11.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,744 kB
  • sloc: cpp: 33,792; ansic: 9,393; makefile: 9; sh: 6
file content (343 lines) | stat: -rw-r--r-- 17,365 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# Asynchronous MySQL client: mysql\_cli

# Sample code

[tutorial-12-mysql\_cli.cc](/tutorial/tutorial-12-mysql_cli.cc)

# About mysql\_cli

The usage of mysql\_cli in the tutorial is similar to that of the official client. It is an asynchronous MySQL client with an interactive command line interface.

To start the program, run the command: ./mysql_cli \<URL\>

After startup, you can directly enter MySQL command in the terminal to interact with db, or enter `quit` or `Ctrl-C` to exit.

# Format of MySQL URL

mysql://username:password@host:port/dbname?character\_set=charset&character\_set\_results=charset

- set scheme to be **mysqls://** for accessing MySQL with SSL connnection (MySQL server 5.7 or above is required).

- fill in the username and the password for the MySQL database; Special characters in password need to be escaped.
~~~cpp
// Password: @@@@####
std::string url = "mysql://root:" + StringUtil::url_encode_component("@@@@####") + "@127.0.0.1";
~~~

- the default port number is 3306;

- **dbname** is the name of the database to be used. It is recommended to provide a dbname if SQL statements only operates on one database;

- If you have upstream selection requirements for MySQL, please see [upstream documents](/docs/en/about-upstream.md).

- **character_set** indicates a character set used for the client, with the same meaning of --default-character-set in official client. The default value is utf8. For details, please see official MySQL documents [character-set.html](https://dev.mysql.com/doc/internals/en/character-set.html).

- **character_set_results** indicates a character set for client, connection and results. If you wants to use `SET NAMES ` in SQL statements, please set it here.

Sample MySQL URL:

mysql://root:password@127.0.0.1

mysql://@test.mysql.com:3306/db1?character\_set=utf8&character_set_results=utf8

mysqls://localhost/db1?character\_set=big5

# Creating and starting a MySQL task

You can use WFTaskFactory to create a MySQL task. The usage of creating interface and callback functions are similar to other tasks in workflow:

~~~cpp
using mysql_callback_t = std::function<void (WFMySQLTask *)>;

WFMySQLTask *create_mysql_task(const std::string& url, int retry_max, mysql_callback_t callback);

void set_query(const std::string& query);
~~~

You can call **set\_query()** on the request to write SQL statements after creating a WFMySQLTask.

If **set_query()** had **NOT** been called before the task started, the user might get **WFT_ERR_MYSQL_QUERY_NOT_SET** in callback.

Other functions, including callback, series and user\_data are used in a way similar to other tasks in workflow.

The following codes show some general usage:
~~~cpp
int main(int argc, char *argv[])
{
    ...
    WFMySQLTask *task = WFTaskFactory::create_mysql_task(url, RETRY_MAX, mysql_callback);
    task->get_req()->set_query("SHOW TABLES;");
    ...
    task->start();
    ...
}
~~~

# Supported commands

Currently the supported command is **COM\_QUERY**, which can cover the basic requirements for adding, deleting, modifying and querying data, creating and deleting databases, creating and deleting tables, prepare, using stored procedures and using transactions.

Because the program doesn't support the selection of databases (**USE** command) in our interactive commands, if there are **cross-database** operations in SQL statements, you can specify the database and table with **db\_name.table\_name**.

Any other command can be **spliced together** and then passed to WFMySQLTask with `set_query()`. (including INSERT/UPDATE/SELECT/PREPARE/CALL)

The spliced commands will be executed sequentially until an error occurs, and the previous commands will be executed successfully.

For example:

~~~cpp
req->set_query("SELECT * FROM table1; CALL procedure1(); INSERT INTO table3 (id) VALUES (1);");
~~~

# Parsing results

Similar to other tasks in workflow, you can use **task->get\_resp()** to get **MySQLResponse**. For details on the interfaces, please see [MySQLResult.h](/src/protocol/MySQLResult.h).

One request will get one response, which is a 3-dimensional structure.
- one response consists of one or more result sets;
- the type of each result set may be **MYSQL_STATUS_GET_RESULT** or **MYSQL_STATUS_OK**;
- one result set of type **MYSQL_STATUS_GET_RESULT** consists of one ore more rows;
- one row consists of one or more fields, or data cells;

The two types of result sets can be judged by ``cursor->get_cursor_status()``.

|      |MYSQL_STATUS_GET_RESULT|MYSQL_STATUS_OK|
|------|-----------------------|---------------|
|SQL command|SELECT(including each SELECT in PROCEDURE)|INSERT / UPDATE / DELETE / ...|
|Semantics|Read. One result set consists of a 2-dimensional structure </br>reprecenting the response of one read operation.|Write. One result set reprecents the results of </br>one write operation.|
|main APIs|fetch_fields();</br>fetch_row(&row_arr);</br>...|get_insert_id();</br>get_affected_rows();</br>...|

When errors occur in spliced commands, you may first get the multiple result sets through **MySQLResultCursor**, the commands which have been executed successfully. Then determine whether ``resp->get_packet_type()`` equals to **MYSQL_PACKET_ERROR** and get the specific error information through ``resp->get_error_code()`` and ``resp->get_error_msg()``.

A **PROCEDURE** command containing N **SELECT** statements will return N result sets of **MYSQL_STATUS_GET_RESULT** and 1 result set of **MYSQL_STATUS_OK**. The user ignores this **MYSQL_STATUS_OK** result set is fine.

To get all the data, the specific steps should be:

1. checking the task state (state at communication): you can check whether the task is successfully executed by checking whether ``task->get_state()`` is equal to **WFT_STATE_SUCCESS**;

2. determining the type of the response packet (state at parsing the return packet): call ``resp->get_packet_type()`` to check the type of the last SQL query return packet. The common types include:

- MYSQL\_PACKET\_OK: parsed successfully, should use cursor to get all the result sets.
- MYSQL\_PACKET\_EOF: parsed successfully, should use cursor to get all the result sets.
- MYSQL\_PACKET\_ERROR: requests: failed or partial failed, may use cursor to get the result sets of those successful commands.

3. traverse the result sets: you can use **MySQLResultCursor** to read the content in the result set. Because the data returned by a MySQL server contains multiple result sets, the cursor will **automatically point to the reading position of the first result set** at first.

4. checking the result set state (state at reading the result sets):   **cursor->get_cursor_status()** returns the following states:

- MYSQL\_STATUS\_GET\_RESULT: current result set is a READ result set;
- MYSQL\_STATUS\_END: the last record of the current READ result set has been read;
- MYSQL\_STATUS\_OK: current result set is a WRITE result set;
- MYSQL\_STATUS\_ERROR: parsing error;

5. reading the basic content of **MYSQL_STATUS_OK** result set:
  - ``unsigned long long get_affected_rows() const;``
  - ``unsigned long long get_insert_id() const;``
  - ``int get_warnings() const;``
  - ``std::string get_info() const;``
  
6. reading each field and each columns of **MYSQL_STATUS_GET_RESULT** result set:
  - `int get_field_count() const;`
  - `const MySQLField *fetch_field();`
  - `const MySQLField *const *fetch_fields() const;`

7. reading each line of **MYSQL_STATUS_GET_RESULT** result set: you can use ``cursor->fetch_row()`` to read by row until the return value is false, in which the offset within the cursor that points to the row in the current result set will be moved:
- `int get_rows_count() const;`
- `bool fetch_row(std::vector<MySQLCell>& row_arr);`
- `bool fetch_row(std::map<std::string, MySQLCell>& row_map);`
- `bool fetch_row(std::unordered_map<std::string, MySQLCell>& row_map);`
- `bool fetch_row_nocopy(const void **data, size_t *len, int *data_type);`

8. taking out all the rows in the current **MYSQL_STATUS_GET_RESULT** result set directly: you can use ``cursor->fetch_all()`` to read all rows, and the cursor that is used to record the rows internally will be moved directly to the end; The cursor state changes to **MYSQL_STATUS_END**:
- `bool fetch_all(std::vector<std::vector<MySQLCell>>& rows);`

9. returning to the head of the current **MYSQL_STATUS_GET_RESULT** result set: if it is necessary to read this result set again, you can use ``cursor->rewind()`` to return to the head of the current result set, and then read it via the operations in Step 7 or Step 8;

10. getting the next result set: because the data packet returned by MySQL server may contains multiple result sets (for example, each SELECT/INSERT/... statement gets one result set; or the multiple result sets returned by calling a PROCEDURE). Therefore, you can use ``cursor->next_result_set()`` to jump to the next result set. If the return value is false, it means that all result sets have been taken.

11. returning to the first result set: use **cursor->first\_result\_set()** to return to the heads of all result sets, and then you can repeat the operations from Step 4.

12. getting the data of each column (MySQLCell): the row read in Step 5 is composed of multiple columns, and the result of each column is one MySQLCell. It mainly uses the following interfaces:

- `int get_data_type();` returns MYSQL\_TYPE\_LONG, MYSQL\_TYPE\_STRING, and etc. For the details, please see [mysql\_types.h](/src/protocol/mysql_types.h).
- `bool is_TYPE() const;` the TYPE is int, string or ulonglong. It is used to check the data type.
- `TYPE as_TYPE() const;` same as the above. It reads the data from MySQLCell in a certain type.
- `void get_cell_nocopy(const void **data, size_t *len, int *data_type) const;` nocopy interface.

The whole example is shown below:

~~~cpp
void task_callback(WFMySQLTask *task)
{
    // step-1. Check the status of the task
    if (task->get_state() != WFT_STATE_SUCCESS)
    {
        fprintf(stderr, "task error = %d\n", task->get_error());
        return;
    }

    MySQLResultCursor cursor(task->get_resp());
    bool test_first_result_set_flag = false;
    bool test_rewind_flag = false;

    // step-2. Check other status of repsponse packet
    if (resp->get_packet_type() == MYSQL_PACKET_ERROR)
    {
        fprintf(stderr, "ERROR. error_code=%d %s\n",
                task->get_resp()->get_error_code(),
                task->get_resp()->get_error_msg().c_str());
    }

begin:
    // step-3. Traverse the result sets
    do {
        // step-4. Check the status of the result set
        if (cursor.get_cursor_status() == MYSQL_STATUS_OK)
        {
            // step-5. Read the basic content of MYSQL_STATUS_OK result set
            fprintf(stderr, "OK. %llu rows affected. %d warnings. insert_id=%llu.\n",
                    cursor.get_affected_rows(), cursor.get_warnings(), cursor.get_insert_id());
        }
        else if (cursor.get_cursor_status() == MYSQL_STATUS_GET_RESULT)
        {
            fprintf(stderr, "field_count=%u rows_count=%u ",
                    cursor.get_field_count(), cursor.get_rows_count());

            // step-6. Read each fields. This is a nocopy api
            const MySQLField *const *fields = cursor.fetch_fields();
            for (int i = 0; i < cursor.get_field_count(); i++)
            {
                fprintf(stderr, "db=%s table=%s name[%s] type[%s]\n",
                        fields[i]->get_db().c_str(), fields[i]->get_table().c_str(),
                        fields[i]->get_name().c_str(), datatype2str(fields[i]->get_data_type()));
            }

            // step-8. Read all the rows. You may use while (cursor.fetch_row(map/vector)) to get each rows accoding to step-7
            std::vector<std::vector<MySQLCell>> rows;

            cursor.fetch_all(rows);
            for (unsigned int j = 0; j < rows.size(); j++)
            {
                // step-12. Read each cell
                for (unsigned int i = 0; i < rows[j].size(); i++)
                {
                    fprintf(stderr, "[%s][%s]", fields[i]->get_name().c_str(),
                            datatype2str(rows[j][i].get_data_type()));
                    // step-12. Check the type wih is_string()and transform the type with as_string()
                    if (rows[j][i].is_string())
                    {
                        std::string res = rows[j][i].as_string();
                        fprintf(stderr, "[%s]\n", res.c_str());
                    }
                    else if (rows[j][i].is_int())
                    {
                        fprintf(stderr, "[%d]\n", rows[j][i].as_int());
                    } // else if ...
                }
            }
        }
    // step-10. Get the next result set
    } while (cursor.next_result_set());

    if (test_first_result_set_flag == false)
    {
        test_first_result_set_flag = true;
        // step-11.  Go back to the first result set
        cursor.first_result_set();
        goto begin;
    }

    if (test_rewind_flag == false)
    {
        test_rewind_flag = true;
        // step-9. Go back to the first position of the current result set
        cursor.rewind();
        goto begin;
    }

    return;
}
~~~

# WFMySQLConnection

Since it is a highly concurrent asynchronous client, this means that the client may have more than one connection to the server. As both MySQL transactions and preparation are stateful, in order to ensure that one transaction or preparation ocupies one connection exclusively, you can use our encapsulated secondary factory WFMySQLConnection to create a task. Each WFMySQLConnection guarantees that one connection is occupied exclusively. For the details, please see [WFMySQLConnection.h](/src/client/WFMySQLConnection.h).

### 1\. Creating and initializing WFMySQLConnection

When creating a WFMySQLConnection, you need to pass in **id**, and the subsequent calls on this WFMySQLConnection will use this id and url to find the corresponding unique connection.

When initializing a WFMySQLConnection, you need to pass a URL, and then you do not need to set the URL for the task created on this connection.

~~~cpp
class WFMySQLConnection
{
public:
    WFMySQLConnection(int id);
    int init(const std::string& url);
    ...
};
~~~

### 2\. Creating a task and closing a connection

With **create\_query\_task()**, you can create a task by writing an SQL request and a callback function. The task is garuanteed to be sent on this connection.

Sometimes you need to close this connection manually. Because when you stop using it, this connection will be kept until MySQL server time out. During this period, if you use the same id and url to create a WFMySQLConnection, you may reuse the connection.

Therefore, we suggest that if you do not want to reuse the connection, you should use **create\_disconnect\_task()** to create a task and manually close the connection.

~~~cpp
class WFMySQLConnection
{
public:
    ...
    WFMySQLTask *create_query_task(const std::string& query,
                                   mysql_callback_t callback);
    WFMySQLTask *create_disconnect_task(mysql_callback_t callback);
}
~~~

WFMySQLConnection is equivalent to a secondary factory. In the framework, we arrange that the life cycle of any factory object does not need to be maintained until the task ends. The following code is completely legal:

~~~cpp
    WFMySQLConnection *conn = new WFMySQLConnection(1234);
    conn->init(url);
    auto *task = conn->create_query_task("SELECT * from table", my_callback);
    conn->deinit();
    delete conn;
    task->start();
~~~

### 3\. Cautions

Do not generate new connection id infinitely, becauase easy id will occupy a little memory. The connection will be put to an internal connection pool if user didn't run a disconnect task, and will been reused by another WFMySQLConnection object initialized with the same id and url.

Running tasks on the same connection parallelly will fail with error **EAGAIN**.

If you have started `BEGIN` but have not `COMMIT` or `ROLLBACK` during the transaction and the connection has been interrupted during the transaction, the connection will be automatically reconnected internally by the framework, and you will get **ECONNRESET** error in the next task request. In this case, the transaction statements those have not been `COMMIT` would be expired and you may need to send them again.

### 4\. Preparation

You can also use the WFMySQLConnection for **PREPARE**. And you can easily use it to **defend against SQL injection**. If the connection is reconnected, you also get an **ECONNRESET** error.

### 5\. Complete example

~~~cpp
WFMySQLConnection conn(1);
conn.init("mysql://root@127.0.0.1/test");

// test transaction
const char *query = "BEGIN;";
WFMySQLTask *t1 = conn.create_query_task(query, task_callback);
query = "SELECT * FROM check_tiny FOR UPDATE;";
WFMySQLTask *t2 = conn.create_query_task(query, task_callback);
query = "INSERT INTO check_tiny VALUES (8);";
WFMySQLTask *t3 = conn.create_query_task(query, task_callback);
query = "COMMIT;";
WFMySQLTask *t4 = conn.create_query_task(query, task_callback);
WFMySQLTask *t5 = conn.create_disconnect_task(task_callback);
SeriesWork *series = create_series_work(t1, nullptr);
*series << t2 << t3 << t4 << t5;
series->start();
~~~