1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
|
/* Copyright (c) 2014, 2025, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
#define CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
#include <cstdint>
#include <vector>
#include "sql/containers/integrals_lockfree_queue.h" // container::Integrals_lockfree_queue
#include "sql/locks/shared_spin_lock.h" // lock::Shared_spin_lock
#include "sql/mdl.h" // MDL_context
#include "sql/memory/aligned_atomic.h" // memory::Aligned_atomic
namespace cs {
namespace apply {
/**
Queue to maintain the ordered sequence of workers waiting for commit.
The queue has a static list of elements, each one representing each worker
commit information.
The management of the order by which each worker will commit is implemented
using:
- A member variable pointing to the first worker to commit, the `head`.
- A member variable pointing to the last worker to commit, the `tail`.
- Each queue element holds a member variable that points to the next worker to
commit, the `next`.
- Pushing a new element will move the `tail`.
- Popping an element will move the `head`.
Atomics are used to make the queue thread-safe without the need for an
explicit lock.
*/
class Commit_order_queue {
public:
using value_type = unsigned long;
using queue_type = container::Integrals_lockfree_queue<value_type>;
using sequence_type = unsigned long long;
static constexpr value_type NO_WORKER{
queue_type::null_value}; // No worker on the queue
/**
Enumeration to represent each worker state
*/
enum class enum_worker_stage {
REGISTERED, // Transaction was handed-over to worker, for applying
FINISHED_APPLYING, // Transaction execution finished
REQUESTED_GRANT, // Request for turn to commit has been placed
WAITED, // Waited for the turn to commit
WAITED_OVERTAKE, // Waited to commit, allowed to commit out of order
FINISHED // Committed and finished processing the transaction
};
/**
Queue element, holding the needed information to manage the commit ordering.
*/
class Node {
public:
friend class Commit_order_queue;
/** The identifier of the worker that maps to a queue index. */
value_type m_worker_id{NO_WORKER};
/** The MDL context to be used to wait on the MDL graph. */
MDL_context *m_mdl_context{nullptr};
/** Which stage is the worker on. */
memory::Aligned_atomic<Commit_order_queue::enum_worker_stage> m_stage{
Commit_order_queue::enum_worker_stage::FINISHED};
/**
Marks the commit request sequence number this node's worker is
processing as frozen iff the sequence number current value is equal
to the `expected` parameter.
Commit request sequence numbers are monotonically ever increasing
numbers that are used by worker threads to ensure ownership of the
worker commit turn unblocking operation:
1) A worker holding a sequence number `N` can only unblock worker
with sequence number `N + 1`.
2) A worker with sequence number `N + 1` can't be assigned a new
sequence number if worker with sequence number `N` is executing
the unblocking operation.
@param expected the commit request sequence number this node must
currently hold in order for it to be frozen
@return true if this nodes commit request sequence number has been
frozen, false otherwise.
*/
bool freeze_commit_sequence_nr(Commit_order_queue::sequence_type expected);
/**
Removes the frozen mark from the commit request sequence number this
node's worker is processing if it was previously frozen.
Commit request sequence numbers are monotonically ever increasing
numbers that are used by worker threads to ensure ownership of the
worker commit turn unblocking operation:
1) A worker holding a sequence number `N` can only unblock worker
with sequence number `N + 1`.
2) A worker with sequence number `N + 1` can't be assigned a new
sequence number if worker with sequence number `N` is executing
the unblocking operation.
@param previously_frozen the sequence number value that was provided
while previously freezing.
@return true if this nodes commit request sequence number was frozen
and is now unfrozen, false otherwise.
*/
bool unfreeze_commit_sequence_nr(
Commit_order_queue::sequence_type previously_frozen);
private:
// No commit request sequence number assigned
static constexpr Commit_order_queue::sequence_type NO_SEQUENCE_NR{0};
// Commit request sequence number is marked as frozen
static constexpr Commit_order_queue::sequence_type SEQUENCE_NR_FROZEN{1};
/** The sequence number for the commit request this node's worker is
processing. */
memory::Aligned_atomic<Commit_order_queue::sequence_type>
m_commit_sequence_nr{NO_SEQUENCE_NR};
/** The sequence number for the commit request of the worker scheduled
after this one. */
memory::Aligned_atomic<Commit_order_queue::sequence_type>
m_next_commit_sequence_nr{NO_SEQUENCE_NR};
/**
Sets the commit request sequence number for this node as unassigned. If
the sequence number is currently frozen, invoking this method will make
the invoking thread to spin until the sequence number is unfrozen.
Commit request sequence numbers are monotonically ever increasing
numbers that are used by worker threads to ensure ownership of the
worker commit turn unblocking operation:
1) A worker holding a sequence number `N` can only unblock worker
with sequence number `N + 1`.
2) A worker with sequence number `N + 1` can't be assigned a new
sequence number if worker with sequence number `N` is executing
the unblocking operation.
@return the sequence number for the commit request sequence number
of the worker scheduled after this one, or NO_SEQUENCE_NR
if there is no next worker.
*/
Commit_order_queue::sequence_type reset_commit_sequence_nr();
};
/**
Iterator helper class to iterate over the Commit_order_queue following the
underlying commit order.
Check C++ documentation on `Iterator named requirements` for more
information on the implementation.
*/
class Iterator {
public:
using difference_type = std::ptrdiff_t;
using value_type = Commit_order_queue::Node *;
using pointer = Commit_order_queue::Node *;
using reference = Commit_order_queue::Node *;
using iterator_category = std::forward_iterator_tag;
using index_type = Commit_order_queue::queue_type::index_type;
explicit Iterator(Commit_order_queue &parent, index_type position);
Iterator(const Iterator &rhs);
Iterator(Iterator &&rhs);
virtual ~Iterator() = default;
// BASIC ITERATOR METHODS //
Iterator &operator=(const Iterator &rhs);
Iterator &operator=(Iterator &&rhs);
Iterator &operator++();
reference operator*();
// END / BASIC ITERATOR METHODS //
// INPUT ITERATOR METHODS //
Iterator operator++(int);
pointer operator->();
bool operator==(Iterator const &rhs) const;
bool operator!=(Iterator const &rhs) const;
// END / INPUT ITERATOR METHODS //
// OUTPUT ITERATOR METHODS //
// reference operator*(); <- already defined
// iterator operator++(int); <- already defined
// END / OUTPUT ITERATOR METHODS //
// FORWARD ITERATOR METHODS //
// Enable support for both input and output iterator <- already enabled
// END / FORWARD ITERATOR METHODS //
private:
/** The target queue that holds the list to be iterated. */
Commit_order_queue *m_target{nullptr};
/** The iterator pointing to the underlying queue position. */
Commit_order_queue::queue_type::Iterator m_current;
};
/**
Constructor for the class, takes the number of workers and initializes the
underlying static list with such size.
@param n_workers The number of workers to include in the commit order
managerment.
*/
Commit_order_queue(size_t n_workers);
/**
Default destructor for the class.
*/
virtual ~Commit_order_queue() = default;
/**
Retrieve the commit order information Node for worker identified by `id`.
@param id The identifier of the worker
@return A reference to the commit order information Node for the given
worker.
*/
Node &operator[](value_type id);
/**
Retrieves the error state for the current thread last executed queue
operation. Values may be:
- SUCCESS is the operation succeeded.
- NO_MORE_ELEMENTS if the last pop tried to access an empty queue.
- NO_SPACE_AVAILABLE if the last push tried to push while the queue was
full.
@return The error state for the thread's last operation on the queue.
*/
Commit_order_queue::queue_type::enum_queue_state get_state();
/**
Whether or not there are more workers to commit.
@return True if there are no more workers, false otherwise.
*/
bool is_empty();
/**
Removes from the queue and returns the identifier of the worker that is
first in-line to commit.
If another thread is accessing the commit order sequence number and has
frozen it's state, this operation will spin until the state is
unfrozen.
@return A tuple holding the identifier of the worker that is first
in-line to commit and the sequence number for the commit
request of the worker scheduled after this one (or NO_SEQUENCE_NR
if there is no next worker).
*/
std::tuple<value_type, sequence_type> pop();
/**
Adds to the end of the commit queue the worker identifier passed as
parameter.
@param id The identifier of the worker to add to the commit queue.
*/
void push(value_type id);
/**
Retrieves the identifier of the worker that is first in-line to commit.
@return The identifier of the worker that is first in-line to commit.
*/
value_type front();
/**
Removes all remaining workers from the queue.
*/
void clear();
/**
Acquires exclusivity over changes (push, pop) on the queue.
*/
void freeze();
/**
Releases exclusivity over changes (push, pop) on the queue.
*/
void unfreeze();
/**
Retrieves an iterator instance that points to the head of the commit queue
and that will iterate over the worker Nodes that are in-line to commit,
following the requested commit order.
@return An instance of `Iterator` pointing to the queue's head, that
iterates over the workers that are in-line to commit and following
the requested commit order.
*/
Iterator begin();
/**
Retrieves an iterator instance that points to the tail of the commit queue.
@return An instance of `Iterator` that points to the tail of the
queue.
*/
Iterator end();
/**
Retrieves the textual representation of this object's underlying commit
queue.
@return The textual representation of this object's underlying commit queue.
*/
std::string to_string();
/**
Friend operator for writing to an `std::ostream` object.
@see `std::ostream::operator<<`
*/
friend inline std::ostream &operator<<(std::ostream &out,
Commit_order_queue &to_output) {
out << to_output.to_string() << std::flush;
return out;
}
/**
Returns the expected next number in the ticket sequence.
@param current_seq_nr The current sequence number, for which the next
should be computed.
@return The expected next number in the ticket sequence.
*/
static sequence_type get_next_sequence_nr(sequence_type current_seq_nr);
/**
Removes from the queue and returns the first identifier of the worker
that is equal to `index`.
@return A tuple holding the identifier of the worker equal to `index`
and the sequence number for the commit request of the worker
scheduled after this one (or NO_SEQUENCE_NR if there is no
next worker).
*/
std::tuple<value_type, sequence_type> remove(value_type index);
private:
/** The commit sequence number counter */
memory::Aligned_atomic<sequence_type> m_commit_sequence_generator{
Node::SEQUENCE_NR_FROZEN + 1};
/** The list of worker Nodes, indexed by worker ID. */
std::vector<Commit_order_queue::Node> m_workers;
/** The queue to hold the sequence of worker IDs waiting to commit. */
queue_type m_commit_queue;
/** The lock to acquire exlusivity over changes on the queue. */
lock::Shared_spin_lock m_push_pop_lock;
/**
Removes from the commit queue the first identifier that is equal to `index`.
@param to_remove The value to remove from the queue.
@return A tuple, where:
- The first component is the removed value, or NO_WORKER if the
value was not found.
- The second component is the value preceding the removed one,
or NO_WORKER if the value was not found, or was found in the
first slot.
*/
std::tuple<value_type, value_type> remove_from_commit_queue(
value_type to_remove);
};
} // namespace apply
} // namespace cs
#endif // CHANGESTREAM_APPLY_COMMIT_ORDER_QUEUE
|