1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
|
/* Copyright (c) 2018, 2025, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "sql/rpl_trx_tracking.h"
#include <algorithm>
#include <utility>
#include <vector>
#include "libbinlogevents/include/binlog_event.h"
#include "my_inttypes.h"
#include "my_sqlcommand.h"
#include "sql/binlog.h"
#include "sql/current_thd.h"
#include "sql/mysqld.h"
#include "sql/rpl_context.h"
#include "sql/rpl_transaction_write_set_ctx.h"
#include "sql/sql_alter.h"
#include "sql/sql_class.h"
#include "sql/sql_lex.h"
#include "sql/system_variables.h"
#include "sql/transaction_info.h"
Logical_clock::Logical_clock() : state(SEQ_UNINIT), offset(0) {}
/**
Atomically fetch the current state.
@return not subtracted "absolute" value.
*/
inline int64 Logical_clock::get_timestamp() {
int64 retval = 0;
DBUG_TRACE;
retval = state.load();
return retval;
}
/**
Steps the absolute value of the clock (state) to return
an updated value.
The caller must be sure to call the method in no concurrent
execution context so either offset and state can't change.
@return incremented "absolute" value
*/
inline int64 Logical_clock::step() {
static_assert(SEQ_UNINIT == 0, "");
DBUG_EXECUTE_IF("logical_clock_step_2", ++state;);
return ++state;
}
/**
To try setting the clock *forward*.
The clock does not change when the new value is in the past
which is reflected by the new value and by offset.
In other words the function main effects is described as
state= max(state, new_value).
Offset that exceeds the new value indicates the binary log rotation
to render such new value useless.
@param new_val a new value (offset included)
@return a (new) value of state member regardless whether it's changed or not.
*/
inline int64 Logical_clock::set_if_greater(int64 new_val) {
int64 old_val = new_val - 1;
bool cas_rc;
DBUG_TRACE;
assert(new_val > 0);
if (new_val <= offset) {
/*
This function's invocation can be separated from the
transaction's flushing by few rotations. A late to log
transaction does not change the clock, similarly to how
its timestamps are handled at flushing.
*/
return SEQ_UNINIT;
}
assert(new_val > 0);
while (
!(cas_rc = atomic_compare_exchange_strong(&state, &old_val, new_val)) &&
old_val < new_val) {
}
assert(state >= new_val); // setting can't be done to past
assert(cas_rc || old_val >= new_val);
return cas_rc ? new_val : old_val;
}
/*
Admin statements release metadata lock too earlier. It breaks the rule of lock
based logical clock. This function recognizes the statements.
*/
static bool is_trx_unsafe_for_parallel_slave(const THD *thd) {
switch (thd->lex->sql_command) {
case SQLCOM_ANALYZE:
case SQLCOM_REPAIR:
case SQLCOM_OPTIMIZE:
case SQLCOM_CREATE_DB:
case SQLCOM_ALTER_DB:
case SQLCOM_DROP_DB:
return true;
case SQLCOM_ALTER_TABLE:
return thd->lex->alter_info->flags & Alter_info::ALTER_ADMIN_PARTITION;
default:
return false;
}
return false;
}
/**
Get the sequence_number for a transaction, and get the last_commit based
on parallel committing transactions.
@param[in] thd Current THD from which to extract trx context.
@param[in] parallelization_barrier The transaction is a
parallelization_barrier and subseqent
transactions should depend on it.
@param[in,out] sequence_number Sequence number of current transaction.
@param[in,out] commit_parent Commit_parent of current transaction,
pre-filled with the commit_parent calculated
by the logical clock logic.
*/
void Commit_order_trx_dependency_tracker::get_dependency(
THD *thd, bool parallelization_barrier, int64 &sequence_number,
int64 &commit_parent) {
Transaction_ctx *trn_ctx = thd->get_transaction();
assert(trn_ctx->sequence_number > m_max_committed_transaction.get_offset());
/*
Prepare sequence_number and commit_parent relative to the current
binlog. This is done by subtracting the binlog's clock offset
from the values.
A transaction that commits after the binlog is rotated, can have a
commit parent in the previous binlog. In this case, subtracting
the offset from the sequence number results in a negative
number. The commit parent dependency gets lost in such
case. Therefore, we log the value SEQ_UNINIT in this case.
*/
sequence_number =
trn_ctx->sequence_number - m_max_committed_transaction.get_offset();
if (trn_ctx->last_committed <= m_max_committed_transaction.get_offset())
commit_parent = SEQ_UNINIT;
else
commit_parent =
std::max(trn_ctx->last_committed, m_last_blocking_transaction) -
m_max_committed_transaction.get_offset();
if (is_trx_unsafe_for_parallel_slave(thd) || parallelization_barrier)
m_last_blocking_transaction = trn_ctx->sequence_number;
}
int64 Commit_order_trx_dependency_tracker::step() {
return m_transaction_counter.step();
}
void Commit_order_trx_dependency_tracker::rotate() {
m_max_committed_transaction.update_offset(
m_transaction_counter.get_timestamp());
m_transaction_counter.update_offset(m_transaction_counter.get_timestamp());
}
void Commit_order_trx_dependency_tracker::update_max_committed(
int64 sequence_number) {
mysql_mutex_assert_owner(&LOCK_replica_trans_dep_tracker);
m_max_committed_transaction.set_if_greater(sequence_number);
}
/**
Get the writeset dependencies of a transaction.
This takes the commit_parent that must be previously set using
Commit_order_trx_dependency_tracker and tries to make the commit_parent as
low as possible, using the writesets of each transaction.
The commit_parent returned depends on how many row hashes are stored in the
writeset_history, which is cleared once it reaches the user-defined maximum.
@param[in] thd Current THD from which to extract trx context.
@param[in,out] sequence_number Sequence number of current transaction.
@param[in,out] commit_parent Commit_parent of current transaction,
pre-filled with the commit_parent calculated by
Commit_order_trx_dependency_tracker to use when
the writeset commit_parent is not valid.
*/
void Writeset_trx_dependency_tracker::get_dependency(THD *thd,
int64 &sequence_number,
int64 &commit_parent) {
Rpl_transaction_write_set_ctx *write_set_ctx =
thd->get_transaction()->get_transaction_write_set_ctx();
std::vector<uint64> *writeset = write_set_ctx->get_write_set();
#ifndef NDEBUG
/* The writeset of an empty transaction must be empty. */
if (is_empty_transaction_in_binlog_cache(thd)) assert(writeset->size() == 0);
#endif
/*
Check if this transaction has a writeset, if the writeset will overflow the
history size, if the transaction_write_set_extraction is consistent
between session and global or if changes in the tables referenced in this
transaction cascade to other tables. If that happens revert to using the
COMMIT_ORDER and clear the history to keep data consistent.
*/
bool can_use_writesets =
// empty writeset implies DDL or similar, except if there are missing keys
(writeset->size() != 0 || write_set_ctx->get_has_missing_keys() ||
/*
The empty transactions do not need to clear the writeset history, since
they can be executed in parallel.
*/
is_empty_transaction_in_binlog_cache(thd)) &&
// hashing algorithm for the session must be the same as used by other
// rows in history
(global_system_variables.transaction_write_set_extraction ==
thd->variables.transaction_write_set_extraction) &&
// must not use foreign keys
!write_set_ctx->get_has_related_foreign_keys() &&
// it did not broke past the capacity already
!write_set_ctx->was_write_set_limit_reached();
bool exceeds_capacity = false;
if (can_use_writesets) {
/*
Check if adding this transaction exceeds the capacity of the writeset
history. If that happens, m_writeset_history will be cleared only after
using its information for current transaction.
*/
exceeds_capacity =
m_writeset_history.size() + writeset->size() > m_opt_max_history_size;
/*
Compute the greatest sequence_number among all conflicts and add the
transaction's row hashes to the history.
*/
int64 last_parent = m_writeset_history_start;
for (std::vector<uint64>::iterator it = writeset->begin();
it != writeset->end(); ++it) {
Writeset_history::iterator hst = m_writeset_history.find(*it);
if (hst != m_writeset_history.end()) {
if (hst->second > last_parent && hst->second < sequence_number)
last_parent = hst->second;
hst->second = sequence_number;
} else {
if (!exceeds_capacity)
m_writeset_history.insert(
std::pair<uint64, int64>(*it, sequence_number));
}
}
/*
If the transaction references tables with missing primary keys revert to
COMMIT_ORDER, update and not reset history, as it is unnecessary because
any transaction that refers this table will also revert to COMMIT_ORDER.
*/
if (!write_set_ctx->get_has_missing_keys()) {
/*
The WRITESET commit_parent then becomes the minimum of largest parent
found using the hashes of the row touched by the transaction and the
commit parent calculated with COMMIT_ORDER.
*/
commit_parent = std::min(last_parent, commit_parent);
}
}
if (exceeds_capacity || !can_use_writesets) {
m_writeset_history_start = sequence_number;
m_writeset_history.clear();
}
}
void Writeset_trx_dependency_tracker::rotate(int64 start) {
m_writeset_history_start = start;
m_writeset_history.clear();
}
/**
Get the writeset commit parent of transactions using the session dependencies.
@param[in] thd Current THD from which to extract trx context.
@param[in,out] sequence_number Sequence number of current transaction.
@param[in,out] commit_parent Commit_parent of current transaction,
pre-filled with the commit_parent calculated
by the Write_set_trx_dependency_tracker as a
fall-back.
*/
void Writeset_session_trx_dependency_tracker::get_dependency(
THD *thd, int64 &sequence_number, int64 &commit_parent) {
int64 session_parent = thd->rpl_thd_ctx.dependency_tracker_ctx()
.get_last_session_sequence_number();
if (session_parent != 0 && session_parent < sequence_number)
commit_parent = std::max(commit_parent, session_parent);
thd->rpl_thd_ctx.dependency_tracker_ctx().set_last_session_sequence_number(
sequence_number);
}
/**
Get the dependencies in a transaction, the main entry point for the
dependency tracking work.
*/
void Transaction_dependency_tracker::get_dependency(
THD *thd, bool parallelization_barrier, int64 &sequence_number,
int64 &commit_parent) {
sequence_number = commit_parent = 0;
switch (m_opt_tracking_mode) {
case DEPENDENCY_TRACKING_COMMIT_ORDER:
m_commit_order.get_dependency(thd, parallelization_barrier,
sequence_number, commit_parent);
break;
case DEPENDENCY_TRACKING_WRITESET:
m_commit_order.get_dependency(thd, parallelization_barrier,
sequence_number, commit_parent);
m_writeset.get_dependency(thd, sequence_number, commit_parent);
break;
case DEPENDENCY_TRACKING_WRITESET_SESSION:
m_commit_order.get_dependency(thd, parallelization_barrier,
sequence_number, commit_parent);
m_writeset.get_dependency(thd, sequence_number, commit_parent);
m_writeset_session.get_dependency(thd, sequence_number, commit_parent);
break;
default:
assert(0); // blow up on debug
/*
Fallback to commit order on production builds.
*/
m_commit_order.get_dependency(thd, parallelization_barrier,
sequence_number, commit_parent);
}
}
void Transaction_dependency_tracker::tracking_mode_changed() {
Logical_clock max_committed_transaction =
m_commit_order.get_max_committed_transaction();
int64 timestamp = max_committed_transaction.get_timestamp() -
max_committed_transaction.get_offset();
m_writeset.rotate(timestamp);
}
/**
The method is to be executed right before committing time.
It must be invoked even if the transaction does not commit
to engine being merely logged into the binary log.
max_committed_transaction is updated with a greater timestamp
value.
As a side effect, the transaction context's sequence_number
is reset.
@param thd a pointer to THD instance
*/
void Transaction_dependency_tracker::update_max_committed(THD *thd) {
Transaction_ctx *trn_ctx = thd->get_transaction();
m_commit_order.update_max_committed(trn_ctx->sequence_number);
/*
sequence_number timestamp isn't needed anymore, so it's cleared off.
*/
trn_ctx->sequence_number = SEQ_UNINIT;
assert(trn_ctx->last_committed == SEQ_UNINIT ||
thd->commit_error == THD::CE_FLUSH_ERROR ||
thd->commit_error == THD::CE_FLUSH_GNO_EXHAUSTED_ERROR);
}
int64 Transaction_dependency_tracker::step() { return m_commit_order.step(); }
void Transaction_dependency_tracker::rotate() {
m_commit_order.rotate();
/*
To make slave appliers be able to execute transactions in parallel
after rotation, set the minimum commit_parent to 1 after rotation.
*/
m_writeset.rotate(1);
if (current_thd) current_thd->get_transaction()->sequence_number = 2;
}
int64 Transaction_dependency_tracker::get_max_committed_timestamp() {
return m_commit_order.get_max_committed_transaction().get_timestamp();
}
|