File: clone_status.h

package info (click to toggle)
mysql-8.0 8.0.43-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,273,924 kB
  • sloc: cpp: 4,684,605; ansic: 412,450; pascal: 108,398; java: 83,641; perl: 30,221; cs: 27,067; sql: 26,594; sh: 24,181; python: 21,816; yacc: 17,169; php: 11,522; xml: 7,388; javascript: 7,076; makefile: 2,194; lex: 1,075; awk: 670; asm: 520; objc: 183; ruby: 97; lisp: 86
file content (486 lines) | stat: -rw-r--r-- 14,538 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
/* Copyright (c) 2019, 2025, Oracle and/or its affiliates.

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License, version 2.0,
   as published by the Free Software Foundation.

   This program is designed to work with certain software (including
   but not limited to OpenSSL) that is licensed under separate terms,
   as designated in a particular file or component or in included license
   documentation.  The authors of MySQL hereby grant you an additional
   permission to link the program and your derivative works with the
   separately licensed software that they have either included with
   the program or referenced in the documentation.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License, version 2.0, for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */

/**
@file clone/include/clone_status.h
Clone Plugin: Client Status Interface

*/

#ifndef CLONE_STATUS_H
#define CLONE_STATUS_H

#include <mysql/components/services/pfs_plugin_table_service.h>
#include <array>
#include "my_systime.h"
#include "plugin/clone/include/clone.h"

THD *thd_get_current_thd();

/* Namespace for all clone data types */
namespace myclone {

/** Log the current error message.
@param[in,out]	thd		current session THD
@param[in]	is_client	true, if called by client
@param[in]	error		error code
@param[in]	message_start	start error message string */
void log_error(THD *thd, bool is_client, int32_t error,
               const char *message_start);

/** Abstract base class for clone PFS tables. */
class Table_pfs {
 public:
  /** Constructor.
  @param[in]	num_rows	total number of rows in table. */
  Table_pfs(uint32_t num_rows);

  /** Destructor. */
  virtual ~Table_pfs() = default;

  /** Read column at index of current row. Implementation
  is specific to table.
   @param[out]	field	column value
   @param[in]	index	column position within row
   @return error code. */
  virtual int read_column_value(PSI_field *field, uint32_t index) = 0;

  /** Initialize the table.
  @return plugin table error code. */
  virtual int rnd_init() = 0;

  /** Initialize position for table.
  @param[in]	id	clone ID. */
  void init_position(uint32_t id) {
    m_position = 0;
    m_empty = (id == 0);
  }

  /** Set cursor to next record.
  @return plugin table error code. */
  int rnd_next() {
    /* Table is empty. */
    if (is_empty()) {
      return (PFS_HA_ERR_END_OF_FILE);
    }
    ++m_position;
    if (m_position <= m_rows) {
      return (0);
    }
    /* All rows are read. */
    assert(m_position == m_rows + 1);
    return (PFS_HA_ERR_END_OF_FILE);
  }

  /** Set cursor to current position: currently no op.
  @return plugin table error code. */
  int rnd_pos() {
    if (m_position > 0 && m_position <= m_rows) {
      return (0);
    }
    return (PFS_HA_ERR_END_OF_FILE);
  }

  /** Reset cursor position to beginning. */
  void reset_pos() { m_position = 0; }

  /** Close the table. */
  void close() { m_position = 0; }

  /* @return address of current position. PFS needs it to set
  the position for proxy table. */
  uint32_t *get_position_address() { return (&m_position); }

  /** Acquire service handles and create proxy tables
  @return false if successful. */
  static bool acquire_services();

  /** Release service handles and delete proxy tables. */
  static void release_services();

  /** Initialize all stage and state names. */
  static void init_state_names();

  /** Clone States. */
  enum Clone_state : uint32_t {
    STATE_NONE = 0,
    STATE_STARTED,
    STATE_SUCCESS,
    STATE_FAILED,
    NUM_STATES
  };
  /** All clone states */
  static std::array<const char *, NUM_STATES> s_state_names;

  /** Clone Stages. Keep in consecutive order as we use it as index. */
  enum Clone_stage : uint32_t {
    STAGE_NONE = 0,
    STAGE_CLEANUP = 1,
    STAGE_FILE_COPY = 2,
    STAGE_PAGE_COPY = 3,
    STAGE_REDO_COPY = 4,
    STAGE_FILE_SYNC = 5,
    STAGE_RESTART = 6,
    STAGE_RECOVERY = 7,
    NUM_STAGES = 8
  };

  /** All clone Stages. */
  static std::array<const char *, NUM_STAGES> s_stage_names;

 protected:
  /** @return Current cursor position. */
  uint32_t get_position() const { return (m_position); }

  /** @return Proxy table share reference. */
  PFS_engine_table_share_proxy *get_proxy_share() { return (&m_pfs_table); }

  /** @return true, if no data in table. */
  bool is_empty() const { return (m_empty); }

 private:
  /** Create PFS proxy tables.
  @return error code. */
  static int create_proxy_tables();

  /** Drop PFS proxy tables. */
  static void drop_proxy_tables();

 private:
  /** Number of rows in table. */
  uint32_t m_rows;

  /** Current position of the cursor. */
  uint32_t m_position;

  /** If the table is empty. */
  bool m_empty;

  /** Proxy table defined in plugin to register callbacks with PFS. */
  PFS_engine_table_share_proxy m_pfs_table;
};

const char g_local_string[] = "LOCAL INSTANCE";

class Status_pfs : public Table_pfs {
 public:
  /* Constructor. */
  Status_pfs();

  /** Read column at specific index of current row.
   @param[out]	field	column value
   @param[in]	index	column position within row
   @return error code. */
  int read_column_value(PSI_field *field, uint32_t index) override;

  /** Initialize the table.
  @return plugin table error code. */
  int rnd_init() override;

  /** Number of rows in status table. Currently we keep last clone status. */
  static const uint32_t S_NUM_ROWS = 1;

  /** POD for the progress data. */
  struct Data {
    /** Read data from status file. */
    void read();

    /** Extract and write recovery information. */
    void recover();

    /** Write data to status file.
    @param[in]	write_error	write error information. */
    void write(bool write_error);

    /* @return true, if destination is current database. */
    bool is_local() const {
      return (0 == strncmp(&m_destination[0], &g_local_string[0],
                           sizeof(m_destination)));
    }

    /** Set PFS table data while starting Clone operation.
    @param[in]	id		clone ID
    @param[in]	thd		session THD
    @param[in]	host		clone source host
    @param[in]	port		clone source port
    @param[in]	destination	clone destination directory or host */
    void begin(uint32_t id, THD *thd, const char *host, uint32_t port,
               const char *destination) {
      m_id = id;
      m_pid = static_cast<uint32_t>(thd_get_thread_id(thd));
      /* Clone from local instance. */
      if (host == nullptr) {
        strncpy(m_source, &g_local_string[0], sizeof(m_source) - 1);
      } else {
        snprintf(m_source, sizeof(m_source) - 1, "%s:%u", host, port);
      }
      /* Clone into local instance. */
      if (destination == nullptr) {
        destination = &g_local_string[0];
      }
      strncpy(m_destination, destination, sizeof(m_destination) - 1);
      m_error_number = 0;
      memset(m_error_mesg, 0, sizeof(m_error_mesg));
      m_binlog_pos = 0;
      memset(m_binlog_file, 0, sizeof(m_binlog_file));
      m_gtid_string.clear();
      m_start_time = my_micro_time();
      m_end_time = 0;
      m_state = STATE_STARTED;
      write(false);
    }

    /** Update PFS table data while ending clone operation.
    @param[in]	err_num		error number
    @param[in]	err_mesg	error message
    @param[in]	provisioning	if we are provisioning current directory. */
    void end(uint32_t err_num, const char *err_mesg, bool provisioning) {
      m_end_time = my_micro_time();
      if (err_num == 0) {
        /* For provisioning, recovery stage is left. */
        if (!provisioning) {
          m_state = Table_pfs::STATE_SUCCESS;
        }
        write(true);
        return;
      }
      m_state = Table_pfs::STATE_FAILED;
      m_error_number = err_num;
      strncpy(m_error_mesg, err_mesg, sizeof(m_error_mesg) - 1);
      write(true);
    }

    /** Update source binlog position consistent with cloned data.
    @param[in]	binlog_file	binary log file name
    @param[in]	position	binary log offset within file */
    void update_binlog_position(const char *binlog_file, uint64_t position) {
      m_binlog_pos = position;
      strncpy(m_binlog_file, binlog_file, sizeof(m_binlog_file) - 1);
    }

    /** Length of variable length character columns. */
    static const size_t S_VAR_COL_LENGTH = 512;

    /** Current State. */
    Clone_state m_state{STATE_NONE};

    /** Clone error number. */
    uint32_t m_error_number{};

    /** Unique identifier in current instance. */
    uint32_t m_id{};

    /** Process List ID. */
    uint32_t m_pid{};

    /** Clone start time. */
    uint64_t m_start_time{};

    /** Clone end time. */
    uint64_t m_end_time{};

    /* Source binary log position. */
    uint64_t m_binlog_pos{};

    /** Clone source. */
    char m_source[S_VAR_COL_LENGTH]{};

    /** Clone destination. */
    char m_destination[S_VAR_COL_LENGTH]{};

    /** Clone error message. */
    char m_error_mesg[S_VAR_COL_LENGTH]{};

    /** Source binary log file name. */
    char m_binlog_file[S_VAR_COL_LENGTH]{};

    /** Clone GTID set */
    std::string m_gtid_string;
  };

 private:
  /** Current status data. */
  Data m_data;
};

class Progress_pfs : public Table_pfs {
 public:
  /* Constructor. */
  Progress_pfs();

  /** Read column at specific index of current row.
   @param[out]	field	column value
   @param[in]	index	column position within row
   @return error code. */
  int read_column_value(PSI_field *field, uint32_t index) override;

  /** Initialize the table.
  @return plugin table error code. */
  int rnd_init() override;

  /** Number of rows in progress table. Therea is one row for each stage. */
  static const uint32_t S_NUM_ROWS = NUM_STAGES - 1;

  /** POD for the progress data. */
  struct Data {
    /** Read data from progress file. */
    void read();

    /** Write data to progress file.
    @@param[in]	data_dir	data directory for write. */
    void write(const char *data_dir);

    /** Get next stage from current.
    @param[in,out]	stage	current/next stage. */
    void next_stage(Clone_stage &stage) {
      auto next_num = static_cast<uint32_t>(stage) + 1;
      auto max_num = static_cast<uint32_t>(NUM_STAGES);
      if (next_num >= max_num) {
        stage = STAGE_NONE;
        return;
      }
      stage = static_cast<Clone_stage>(next_num);
    }

    /** Initialize PFS stage.
    @@param[in]	data_dir	data directory for write. */
    void init_stage(const char *data_dir) {
      m_id = 0;
      m_current_stage = STAGE_NONE;

      /* Clean current stage information. */
      m_data_speed = 0;
      m_network_speed = 0;

      /* Clean all stage specific information. */
      next_stage(m_current_stage);
      while (m_current_stage != STAGE_NONE) {
        /* State */
        m_states[m_current_stage] = STATE_NONE;
        m_threads[m_current_stage] = 0;
        /* Time */
        m_start_time[m_current_stage] = 0;
        m_end_time[m_current_stage] = 0;
        /* Estimates */
        m_estimate[m_current_stage] = 0;
        m_complete[m_current_stage] = 0;
        m_network[m_current_stage] = 0;

        next_stage(m_current_stage);
      }
      write(data_dir);
    }

    /** Set PFS table data while starting Clone a stage.
    @param[in]	id		clone ID
    @@param[in]	data_dir	data directory for write.
    @param[in]	threads		current number of concurrent threads
    @param[in]	estimate	estimated data bytes for stage */
    void begin_stage(uint32_t id, const char *data_dir, uint64_t threads,
                     uint64_t estimate) {
      next_stage(m_current_stage);
      if (m_current_stage == STAGE_NONE) {
        assert(false); /* purecov: inspected */
        return;
      }
      m_states[m_current_stage] = STATE_STARTED;
      m_id = id;
      m_threads[m_current_stage] = threads;

      /* Set time at beginning. */
      m_start_time[m_current_stage] = my_micro_time();
      m_end_time[m_current_stage] = 0;

      /* Reset progress data at the beginning of stage. */
      m_estimate[m_current_stage] = estimate;
      m_complete[m_current_stage] = 0;
      m_network[m_current_stage] = 0;
      m_data_speed = 0;
      m_network_speed = 0;
      write(data_dir);
    }

    /** Set PFS table data while ending a Clone stage.
    @@param[in]	data_dir	data directory for write. */
    void end_stage(bool failed, const char *data_dir) {
      m_end_time[m_current_stage] = my_micro_time();
      m_states[m_current_stage] = failed ? STATE_FAILED : STATE_SUCCESS;
      write(data_dir);
    }

    /** Update data and network consumed.
    @param[in]	data		data bytes transferred
    @param[in]	network		network bytes transferred
    @param[in]	data_speed	data transfer speed in bytes/sec
    @param[in]	net_speed	network transfer speed in bytes/sec
    @param[in]	num_workers	number of worker threads */
    void update_data(uint64_t data, uint64_t network, uint32_t data_speed,
                     uint32_t net_speed, uint32_t num_workers) {
      m_complete[m_current_stage] += data;
      m_network[m_current_stage] += network;
      m_data_speed = data_speed;
      m_network_speed = net_speed;
      m_threads[m_current_stage] = num_workers + 1;
    }

    /** Current progress stage. */
    Clone_stage m_current_stage{STAGE_NONE};

    /** State information for all stages. */
    Clone_state m_states[NUM_STAGES];

    /** Unique identifier in current instance. */
    uint32_t m_id{};

    /** Current data transfer rate. */
    uint32_t m_data_speed{};

    /** Current network transfer rate. */
    uint32_t m_network_speed{};

    /** Number of active threads. */
    uint32_t m_threads[NUM_STAGES]{};

    /** Stage start time. */
    uint64_t m_start_time[NUM_STAGES]{};

    /** Stage end time. */
    uint64_t m_end_time[NUM_STAGES]{};

    /** Estimated bytes for all stages. */
    uint64_t m_estimate[NUM_STAGES]{};

    /** Completed bytes for all stages. */
    uint64_t m_complete[NUM_STAGES]{};

    /** Completed network bytes for all stages. */
    uint64_t m_network[NUM_STAGES]{};
  };

 private:
  /** Current progress data. */
  Data m_data;
};
}  // namespace myclone

#endif /* CLONE_STATUS_H */