File: row0pread-adapter.h

package info (click to toggle)
mysql-8.0 8.0.44-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,272,892 kB
  • sloc: cpp: 4,685,345; ansic: 412,712; pascal: 108,395; java: 83,641; perl: 30,221; cs: 27,067; sql: 26,594; python: 21,816; sh: 17,285; yacc: 17,169; php: 11,522; xml: 7,388; javascript: 7,083; makefile: 1,793; lex: 1,075; awk: 670; asm: 520; objc: 183; ruby: 97; lisp: 86
file content (211 lines) | stat: -rw-r--r-- 7,824 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/*****************************************************************************

Copyright (c) 2018, 2025, Oracle and/or its affiliates.

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License, version 2.0, as published by the
Free Software Foundation.

This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation.  The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
for more details.

You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA

*****************************************************************************/

/** @file include/row0pread-adapter.h
Parallel read adapter interface.

Created 2018-02-28 by Darshan M N. */

#ifndef row0pread_adapter_h
#define row0pread_adapter_h

#include "row0pread.h"
#include "ut0counter.h"

#include "handler.h"

/** Traverse an index in the leaf page block list order and send records to
adapter. */
class Parallel_reader_adapter {
  /** Size of the buffer used to store InnoDB records and sent to the adapter*/
  static constexpr size_t ADAPTER_SEND_BUFFER_SIZE = 2 * 1024 * 1024;

  /** Forward declaration. */
  struct Thread_ctx;

 public:
  using Load_fn = handler::Load_cbk;

  using End_fn = handler::Load_end_cbk;

  using Init_fn = handler::Load_init_cbk;

  /** Constructor.
  @param[in]  max_threads       Maximum threads to use for all scan contexts.
  @param[in]  rowlen            Row length.  */
  Parallel_reader_adapter(size_t max_threads, ulint rowlen);

  /** Destructor. */
  ~Parallel_reader_adapter() = default;

  /** Add scan context.
  @param[in]  trx               Transaction used for parallel read.
  @param[in]  config            (Cluster) Index scan configuration.
  @param[in]  f                 Callback function.
  @retval error. */
  [[nodiscard]] dberr_t add_scan(trx_t *trx,
                                 const Parallel_reader::Config &config,
                                 Parallel_reader::F &&f);

  /** Run the parallel scan.
  @param[in]  thread_contexts   Context for each of the spawned threads
  @param[in]  init_fn           Callback called by each parallel load thread
                                at the beginning of the parallel load.
  @param[in]  load_fn           Callback called by each parallel load thread
                                when processing of rows is required.
  @param[in]  end_fn            Callback called by each parallel load thread
                                when processing of rows has ended.
  @return DB_SUCCESS or error code. */
  [[nodiscard]] dberr_t run(void **thread_contexts, Init_fn init_fn,
                            Load_fn load_fn, End_fn end_fn);

  /** Convert the record in InnoDB format to MySQL format and send them.
  @param[in]  reader_ctx  Parallel read context.
  @return error code */
  [[nodiscard]] dberr_t process_rows(const Parallel_reader::Ctx *reader_ctx);

  /** Set up the query processing state cache.
  @param[in]  prebuilt           The prebuilt cache for the query. */
  void set(row_prebuilt_t *prebuilt);

 private:
  /** Each parallel reader thread's init function.
  @param[in]  reader_thread_ctx  context info related to the
  current thread
  @param[in]  prebuilt           prebuilt cache
  @return DB_SUCCESS or error code. */
  [[nodiscard]] dberr_t init(Parallel_reader::Thread_ctx *reader_thread_ctx,
                             row_prebuilt_t *prebuilt);

  /** Each parallel reader thread's end function.
  @param[in]  reader_thread_ctx  context info related to the current thread
  @return DB_SUCCESS or error code. */
  [[nodiscard]] dberr_t end(Parallel_reader::Thread_ctx *reader_thread_ctx);

  /** Send a batch of records.
  @param[in]  reader_thread_ctx reader threads related thread context info
  @param[in]  partition_id      partition ID of the index the record belongs to
  @param[in]  n_recs            Number of records to send.
  @return DB_SUCCESS or error code. */
  [[nodiscard]] dberr_t send_batch(
      Parallel_reader::Thread_ctx *reader_thread_ctx, size_t partition_id,
      uint64_t n_recs);

  /** Get the number of rows buffered but not sent.
  @param[in]  ctx  adapter related thread context information.
  @return number of buffered items. */
  [[nodiscard]] size_t pending(Thread_ctx *ctx) const {
    return (ctx->m_n_read - ctx->m_n_sent);
  }

  /** Check if the buffer is full.
  @param[in]  ctx  adapter related thread context information.
  @return true if the buffer is full. */
  [[nodiscard]] bool is_buffer_full(Thread_ctx *ctx) const {
    return ctx->m_n_read > 0 && ctx->m_n_read % m_batch_size == 0;
  }

 private:
  /** Adapter context for each of the spawned threads. We don't know the
  type of the context it's passed to us as a void *.  */
  void **m_thread_ctxs{};

  /** Callback called by each parallel load thread at the
  beginning of the parallel load for the scan. */
  Init_fn m_init_fn{};

  /** Callback called by each parallel load thread when
  processing of rows is required for the scan. */
  Load_fn m_load_fn{};

  /** Callback called by each parallel load thread when
  processing of rows has ended for the scan. */
  End_fn m_end_fn{};

  /** Number of records to be sent across to the caller in a batch. */
  uint64_t m_batch_size{};

  /** MySQL row meta data. This is common across partitions. */
  struct MySQL_row {
    using Column_meta_data = std::vector<ulong, ut::allocator<ulong>>;

    /** Column offsets. */
    Column_meta_data m_offsets{};

    /** Column null bit masks. */
    Column_meta_data m_null_bit_mask{};

    /** Column null bit offsets. */
    Column_meta_data m_null_bit_offsets{};

    /** Maximum row length. */
    ulong m_max_len{};
  };

  /** Row meta data per scan context. */
  MySQL_row m_mysql_row{};

  /** Callback thread context for each of the spawned threads. */
  struct Thread_ctx {
    /** Constructor. */
    Thread_ctx();

    /** Destructor. */
    ~Thread_ctx() = default;

    /** Number of records read. */
    size_t m_n_read{};

    /** Number of records sent to the adapter. */
    size_t m_n_sent{};

    /** Partition ID for the records in buffer. Must be set when adding more
    records to be sent i.e. while incrementing m_n_read. */
    size_t m_partition_id{std::numeric_limits<size_t>::max()};

    /** Buffer to store records to be sent to the adapter. */
    std::vector<byte, ut::allocator<byte>> m_buffer;
  };

  /** Prebuilt to use for conversion to MySQL row format.
  NOTE: We are sharing this because we don't convert BLOBs yet. There are
  data members in row_prebuilt_t that cannot be accessed in multi-threaded
  mode e.g., blob_heap.

  row_prebuilt_t is designed for single threaded access and to share
  it among threads is not recommended unless "you know what you are doing".
  This is very fragile code as it stands.

  To solve the blob heap issue in prebuilt we use per thread m_blob_heaps.
  Pass the blob heap to the InnoDB to MySQL row format conversion function. */
  row_prebuilt_t *m_prebuilt{};

  /** Parallel reader to use. */
  Parallel_reader m_parallel_reader;
};

#endif /* !row0pread_adapter_h */