File: rpl_commit_stage_manager.h

package info (click to toggle)
mysql-8.0 8.0.43-3
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 1,273,924 kB
  • sloc: cpp: 4,684,605; ansic: 412,450; pascal: 108,398; java: 83,641; perl: 30,221; cs: 27,067; sql: 26,594; sh: 24,181; python: 21,816; yacc: 17,169; php: 11,522; xml: 7,388; javascript: 7,076; makefile: 2,194; lex: 1,075; awk: 670; asm: 520; objc: 183; ruby: 97; lisp: 86
file content (485 lines) | stat: -rw-r--r-- 16,765 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
/* Copyright (c) 2019, 2025, Oracle and/or its affiliates.

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License, version 2.0,
   as published by the Free Software Foundation.

   This program is designed to work with certain software (including
   but not limited to OpenSSL) that is licensed under separate terms,
   as designated in a particular file or component or in included license
   documentation.  The authors of MySQL hereby grant you an additional
   permission to link the program and your derivative works with the
   separately licensed software that they have either included with
   the program or referenced in the documentation.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License, version 2.0, for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */

#ifndef RPL_COMMIT_STAGE_MANAGER
#define RPL_COMMIT_STAGE_MANAGER

#include <atomic>
#include <utility>

#include "my_dbug.h"
#include "mysql/psi/mysql_cond.h"
#include "mysql/psi/mysql_mutex.h"
#include "sql/sql_class.h"
#include "thr_mutex.h"

class THD;

/**
  Class for maintaining the commit stages for binary log group commit.
 */
class Commit_stage_manager {
 public:
  class Mutex_queue {
    friend class Commit_stage_manager;

   public:
    Mutex_queue() : m_first(nullptr), m_last(&m_first), m_size(0) {}

    void init(mysql_mutex_t *lock) { m_lock = lock; }

    bool is_empty() const { return m_first == nullptr; }

    /**
      Append a linked list of threads to the queue.

      @param[in]  first  Linked list of threads to be appended to queue

      @retval true The queue was empty before this operation.
      @retval false The queue was non-empty before this operation.
    */
    bool append(THD *first);

    /**
      Fetch the entire queue for a stage. It is a wrapper over
      fetch_and_empty() and acquires queue lock before fetching
      and emptying the queue threads.

      @return  Pointer to the first session of the queue.
    */
    THD *fetch_and_empty_acquire_lock();

    /**
      Fetch the entire queue for a stage. It is a wrapper over
      fetch_and_empty(). The caller must acquire queue lock before
      calling this function.

      @return  Pointer to the first session of the queue.
    */
    THD *fetch_and_empty_skip_acquire_lock();

    /**
      Remove first member from the queue

      @retval  Returns std::pair<bool, THD *> object.
               The first boolean value of pair if true determines queue
               is not empty, and false determines queue is empty.
               The second value returns the first removed member.
    */
    std::pair<bool, THD *> pop_front();

    /**
      Get number of elements in the queue.

      @retval  Returns number of element in the queue.
    */
    inline int32 get_size() { return m_size.load(); }

    /**
      Fetch the first thread of the queue.

      @return first thread of the queue.
    */
    THD *get_leader() { return m_first; }

    void lock() {
      mysql_mutex_assert_not_owner(m_lock);
      mysql_mutex_lock(m_lock);
    }

    void unlock() { mysql_mutex_unlock(m_lock); }

    void assert_owner() { mysql_mutex_assert_owner(m_lock); }

   private:
    /**
      Fetch the entire queue for a stage.

      @retval  This will fetch the entire queue in one go.
    */
    THD *fetch_and_empty();

    /**
       Pointer to the first thread in the queue, or nullptr if the queue is
       empty.
    */
    THD *m_first;

    /**
       Pointer to the location holding the end of the queue.

       This is either @c &first, or a pointer to the @c next_to_commit of
       the last thread that is enqueued.
    */
    THD **m_last;

    /** size of the queue */
    std::atomic<int32> m_size;

    /** Lock for protecting the queue. */
    mysql_mutex_t *m_lock;

    /*
      This attribute did not have the desired effect, at least not according
      to -fsanitize=undefined with gcc 5.2.1
     */
  };  // MY_ATTRIBUTE((aligned(CPU_LEVEL1_DCACHE_LINESIZE)));

 private:
  Commit_stage_manager() : m_is_initialized(false) {}

  Commit_stage_manager(const Commit_stage_manager &) = delete;

  const Commit_stage_manager &operator=(const Commit_stage_manager &) = delete;

 public:
  /**
    Fetch Commit_stage_manager class instance.

    @return Reference to the Commit_stage_manager class instance.
  */
  static Commit_stage_manager &get_instance();

  /**
     Constants for queues for different stages.
   */
  enum StageID {
    BINLOG_FLUSH_STAGE,
    SYNC_STAGE,
    COMMIT_STAGE,
    AFTER_COMMIT_STAGE,
    COMMIT_ORDER_FLUSH_STAGE,
    STAGE_COUNTER
  };

  /**
    Initializes m_stage_cond_binlog, m_stage_cond_commit_order,
    m_stage_cond_leader condition variables and m_lock_done mutex.

    The binlog follower threads blocks on m_stage_cond_binlog condition
    variable till signalled to wake up from leader thread. And similarly
    commit order follower threads blocks on m_stage_cond_commit_order
    condition variable till signalled to wake up from leader thread.

    The first binlog thread supposed to be leader finds that commit order queue
    is not empty then it blocks on m_stage_cond_leader till commit order leader
    signals it to awake and become new leader.

    m_lock_done mutex is shared by all three stages.

    @param key_LOCK_flush_queue mutex instrumentation key
    @param key_LOCK_sync_queue mutex instrumentation key
    @param key_LOCK_commit_queue mutex instrumentation key
    @param key_LOCK_after_commit_queue mutex instrumentation key
    @param key_LOCK_done mutex instrumentation key
    @param key_LOCK_wait_for_group_turn mutex instrumentation key
    @param key_COND_done cond instrumentation key
    @param key_COND_flush_queue cond instrumentation key
    @param key_COND_wait_for_group_turn cond instrumentation key
  */
  void init(PSI_mutex_key key_LOCK_flush_queue,
            PSI_mutex_key key_LOCK_sync_queue,
            PSI_mutex_key key_LOCK_commit_queue,
            PSI_mutex_key key_LOCK_after_commit_queue,
            PSI_mutex_key key_LOCK_done,
            PSI_mutex_key key_LOCK_wait_for_group_turn,
            PSI_cond_key key_COND_done, PSI_cond_key key_COND_flush_queue,
            PSI_cond_key key_COND_wait_for_group_turn);

  /**
    Deinitializes m_stage_cond_binlog, m_stage_cond_commit_order,
    m_stage_cond_leader condition variables and m_lock_done mutex.
  */
  void deinit();

  /**
    Checks if the THD session parameter BGC ticket is active and
    the BGC back ticket was incremented.

    @param thd The THD session that holds the ticket to check.

    @return True if the THD session parameter BGC ticket is active and
            the BGC back ticket was incremented, false otherwise.
  */
  bool is_ticket_on_its_turn_and_back_ticket_incremented(THD *thd) const;

  /**
    Waits for the THD session parameter underlying BGC ticket to become
    active.

    @param thd The THD session that holds the ticket to wait for.
    @param update_ticket_manager Indicates whether to mark ticket
    as consumed by the session (add session to processed sessions)
    after the ticket is opened for processing.
   */
  void wait_for_ticket_turn(THD *thd, bool update_ticket_manager = true);

  /**
    Appends the given THD session object to the given stage queue. It
    verifies that the given session's ticket is the active ticket, if not,
    waits on `m_cond_wait_for_ticket_turn` condition variable until it is.

    @param stage The stage to add the THD parameter to.
    @param thd   The THD session object to queue.

    @return True if the session is a group leader, false otherwise.
   */
  bool append_to(StageID stage, THD *thd);

  /**
    Enroll a set of sessions for a stage.

    This will queue the session thread for writing and flushing.

    If the thread being queued is assigned as stage leader, it will
    return immediately.

    If wait_if_follower is true the thread is not the stage leader,
    the thread will be wait for the queue to be processed by the
    leader before it returns.
    In DBUG-ON version the follower marks is preempt status as ready.

    The session threads entering this function acquires mutexes, and few of
    them are not released while exiting based on thread and stage type.
    - A binlog leader (returning true when stage!=COMMIT_ORDER_FLUSH_STAGE) will
      acquire the stage mutex in this function and not release it.
    - A commit order leader of the flush stage (returning true when
      stage==COMMIT_ORDER_FLUSH_STAGE) will acquire both the stage mutex and the
      flush queue mutex in this function, and not release them.
    - A follower (returning false) will release any mutexes it takes, before
      returning from the function.

    @param[in] stage Stage identifier for the queue to append to.
    @param[in] first Queue to append.
    @param[in] stage_mutex
                 Pointer to the currently held stage mutex, or nullptr if we're
                 not in a stage, that will be released when changing stage.
    @param[in] enter_mutex
                 Pointer to the mutex that will be taken when changing stage.

    @retval true  Thread is stage leader.
    @retval false Thread was not stage leader and processing has been done.
   */
  bool enroll_for(StageID stage, THD *first, mysql_mutex_t *stage_mutex,
                  mysql_mutex_t *enter_mutex);

  /**
    Remove first member from the queue for given stage

    @retval  Returns std::pair<bool, THD *> object.
             The first boolean value of pair if true determines queue
             is not empty, and false determines queue is empty.
             The second value returns the first removed member.
  */
  std::pair<bool, THD *> pop_front(StageID stage) {
    return m_queue[stage].pop_front();
  }

#ifndef NDEBUG
  /**
     The method ensures the follower's execution path can be preempted
     by the leader's thread.
     Preempt status of @c head follower is checked to engange the leader
     into waiting when set.

     @param head  THD* of a follower thread
  */
  void clear_preempt_status(THD *head);
#endif

  /**
    Fetch the entire queue and empty it. It acquires queue lock before fetching
    and emptying the queue threads.

    @param[in]  stage             Stage identifier for the queue to append to.

    @return Pointer to the first session of the queue.
  */
  THD *fetch_queue_acquire_lock(StageID stage);

  /**
    Fetch the entire queue and empty it. The caller must acquire queue lock
    before calling this function.

    @param[in]  stage             Stage identifier for the queue to append to.

    @return Pointer to the first session of the queue.
  */
  THD *fetch_queue_skip_acquire_lock(StageID stage);

  /**
    Introduces a wait operation on the executing thread.  The
    waiting is done until the timeout elapses or count is
    reached (whichever comes first).

    If count == 0, then the session will wait until the timeout
    elapses. If timeout == 0, then there is no waiting.

    @param usec     the number of microseconds to wait.
    @param count    wait for as many as count to join the queue the
                    session is waiting on
    @param stage    which stage queue size to compare count against.
   */
  void wait_count_or_timeout(ulong count, long usec, StageID stage);

  /**
    The function is called after follower thread are processed by leader,
    to unblock follower threads.

    @param queue   the thread list which needs to ne unblocked
    @param stage   Stage identifier current thread belong to.
  */
  void signal_done(THD *queue, StageID stage = BINLOG_FLUSH_STAGE);

  /**
    Signals threads waiting on their BGC ticket turn.

    @param force Whether or not to force the signaling, despit the state of
                 the ticket manager.
   */
  void signal_end_of_ticket(bool force = false);
  /**
    Updates the THD session object underlying BGC context.

    @param thd The THD object to update the BGC context for.
   */
  void update_session_ticket_state(THD *thd);
  /**
    Adds the given session count to the total of processed sessions in the
    ticket manager active window, ends the active window if possible and
    notifies other threads that are waiting for a given ticket to have an
    active processing window.

    @param sessions_count The number of sessions to add to the ticket
                          manager processed sessions count.
    @param session_ticket The session ticket (used for validations).
   */
  void update_ticket_manager(std::uint64_t sessions_count,
                             const binlog::BgcTicket &session_ticket);
  /**
    Waits for the session's ticket, if needed, and resets the session's
    ticket context.

    @param thd The THD sessions object to finish the ticket's related work.
   */
  void finish_session_ticket(THD *thd);

  /**
    This function gets called after transactions are flushed to the engine
    i.e. after calling ha_flush_logs, to unblock commit order thread list
    which are not needed to wait for other stages.

    @param first     the thread list which needs to ne unblocked
  */
  void process_final_stage_for_ordered_commit_group(THD *first);

  /**
    Wrapper on Mutex_queue lock(), acquires lock on stage queue.

    @param[in]  stage  Stage identifier for the queue to append to.
  */
  void lock_queue(StageID stage) { m_queue[stage].lock(); }

  /**
    Wrapper on Mutex_queue unlock(), releases lock on stage queue.

    @param[in]  stage  Stage identifier for the queue to append to.
  */
  void unlock_queue(StageID stage) { m_queue[stage].unlock(); }

  /**
    Disables the ability for session BGC tickets to be set manually.
   */
  static void disable_manual_session_tickets();
  /**
    Enables the ability for session BGC tickets to be set manually.
   */
  static void enable_manual_session_tickets();

 private:
  /** check if Commit_stage_manager variables already initialized. */
  bool m_is_initialized;

  /**
     Queues for sessions.

     We need five queues:
     - Binlog flush queue: transactions that are going to be flushed to the
                           engine and written to the binary log.
     - Commit order flush queue: transactions that are not going to write the
                                 binlog at all, but participate in the beginning
                                 of the group commit, up to and including the
                                 engine flush.
     - Sync queue: transactions that are going to be synced to disk
     - Commit queue: transactions that are going to to be committed
                     (when binlog_order_commit=1).
     - After commit queue: transactions for which after commit hook is to be
                           executed.
  */
  Mutex_queue m_queue[STAGE_COUNTER];

  /**
     The binlog leader waits on this condition variable till it is indicated
     to wake up. If binlog flush queue gets first thread in the queue but
     by then commit order flush queue has already elected leader. The the
     first thread of binlog queue waits on this condition variable and get
     signalled to wake up from commit order flush queue leader later.
  */
  mysql_cond_t m_stage_cond_leader;

  /**
     Condition variable to indicate that the binlog threads can wake up
     and continue.
  */
  mysql_cond_t m_stage_cond_binlog;

  /**
     Condition variable to indicate that the flush to storage engine
     is done and commit order threads can again wake up and continue.
  */
  mysql_cond_t m_stage_cond_commit_order;

  /** Mutex used for the condition variable above */
  mysql_mutex_t m_lock_done;

  /** Mutex used for the stage level locks */
  mysql_mutex_t m_queue_lock[STAGE_COUNTER - 1];

#ifndef NDEBUG
  /** Save pointer to leader thread which is used later to awake leader */
  THD *leader_thd;

  /** Flag is set by Leader when it starts waiting for follower's all-clear */
  bool leader_await_preempt_status;

  /** Condition variable to indicate a follower started waiting for commit */
  mysql_cond_t m_cond_preempt;
#endif

  /** Condition variable to wait for a given ticket to become active. */
  mysql_cond_t m_cond_wait_for_ticket_turn;
  /** Mutex to protect the wait for a given ticket to become active. */
  mysql_mutex_t m_lock_wait_for_ticket_turn;
};

#endif /*RPL_COMMIT_STAGE_MANAGER*/