1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
|
/* Copyright (c) 2013, 2025, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "sql/rpl_mta_submode.h"
#include <limits.h>
#include <string.h>
#include <time.h>
#include <memory>
#include "binlog/decompressing_event_object_istream.h"
#include "lex_string.h"
#include "m_string.h"
#include "my_byteorder.h"
#include "my_compiler.h"
#include "my_dbug.h"
#include "my_inttypes.h"
#include "my_loglevel.h"
#include "my_systime.h"
#include "my_thread.h"
#include "mysql/components/services/bits/psi_stage_bits.h"
#include "mysql/components/services/log_builtins.h"
#include "mysql/psi/mysql_cond.h"
#include "mysql/psi/mysql_mutex.h"
#include "mysqld_error.h"
#include "sql/binlog_reader.h"
#include "sql/debug_sync.h"
#include "sql/log.h"
#include "sql/log_event.h" // Query_log_event
#include "sql/mdl.h"
#include "sql/mysqld.h" // stage_worker_....
#include "sql/query_options.h"
#include "sql/rpl_filter.h"
#include "sql/rpl_replica.h"
#include "sql/rpl_replica_commit_order_manager.h" // Commit_order_manager
#include "sql/rpl_rli.h" // Relay_log_info
#include "sql/rpl_rli_pdb.h" // db_worker_hash_entry
#include "sql/sql_class.h" // THD
#include "sql/system_variables.h"
#include "sql/table.h"
/**
Does necessary arrangement before scheduling next event.
@return 1 if error
0 no error
*/
int Mts_submode_database::schedule_next_event(Relay_log_info *, Log_event *) {
/*nothing to do here*/
return 0;
}
/**
Logic to attach temporary tables.
*/
void Mts_submode_database::attach_temp_tables(THD *thd, const Relay_log_info *,
Query_log_event *ev) {
int i, parts;
DBUG_TRACE;
if (!is_mts_worker(thd) || (ev->ends_group() || ev->starts_group())) return;
assert(!thd->temporary_tables);
// in over max-db:s case just one special partition is locked
parts = ((ev->mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
? 1
: ev->mts_accessed_dbs);
for (i = 0; i < parts; i++) {
mts_move_temp_tables_to_thd(
thd, ev->mts_assigned_partitions[i]->temporary_tables);
ev->mts_assigned_partitions[i]->temporary_tables = nullptr;
}
}
/**
Function is called by Coordinator when it identified an event
requiring sequential execution.
Creating sequential context for the event includes waiting
for the assigned to Workers tasks to be completed and their
resources such as temporary tables be returned to Coordinator's
repository.
In case all workers are waited Coordinator changes its group status.
@param rli Relay_log_info instance of Coordinator
@param ignore Optional Worker instance pointer if the sequential context
is established due for the ignore Worker. Its resources
are to be retained.
@note Resources that are not occupied by Workers such as
a list of temporary tables held in unused (zero-usage) records
of APH are relocated to the Coordinator placeholder.
@return non-negative number of released by Workers partitions
(one partition by one Worker can count multiple times)
or -1 to indicate there has been a failure on a not-ignored Worker
as indicated by its running_status so synchronization can't succeed.
*/
int Mts_submode_database::wait_for_workers_to_finish(Relay_log_info *rli,
Slave_worker *ignore) {
uint ret = 0;
THD *thd = rli->info_thd;
bool cant_sync = false;
char llbuf[22];
DBUG_TRACE;
llstr(rli->get_event_relay_log_pos(), llbuf);
DBUG_PRINT("info", ("Coordinator and workers enter synchronization "
"procedure when scheduling event relay-log: %s "
"pos: %s",
rli->get_event_relay_log_name(), llbuf));
mysql_mutex_lock(&rli->slave_worker_hash_lock);
for (const auto &key_and_value : rli->mapping_db_to_worker) {
db_worker_hash_entry *entry = key_and_value.second.get();
assert(entry);
// the ignore Worker retains its active resources
if (ignore && entry->worker == ignore && entry->usage > 0) {
continue;
}
if (entry->usage > 0 && !thd->killed) {
PSI_stage_info old_stage;
Slave_worker *w_entry = entry->worker;
entry->worker = nullptr; // mark Worker to signal when usage drops to 0
thd->ENTER_COND(
&rli->slave_worker_hash_cond, &rli->slave_worker_hash_lock,
&stage_replica_waiting_worker_to_release_partition, &old_stage);
do {
mysql_cond_wait(&rli->slave_worker_hash_cond,
&rli->slave_worker_hash_lock);
DBUG_PRINT("info", ("Either got awakened of notified: "
"entry %p, usage %lu, worker %lu",
entry, entry->usage, w_entry->id));
} while (entry->usage != 0 && !thd->killed);
entry->worker =
w_entry; // restoring last association, needed only for assert
mysql_mutex_unlock(&rli->slave_worker_hash_lock);
thd->EXIT_COND(&old_stage);
ret++;
} else {
mysql_mutex_unlock(&rli->slave_worker_hash_lock);
}
// resources relocation
mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
entry->temporary_tables = nullptr;
if (entry->worker->running_status != Slave_worker::RUNNING)
cant_sync = true;
mysql_mutex_lock(&rli->slave_worker_hash_lock);
}
mysql_mutex_unlock(&rli->slave_worker_hash_lock);
if (!ignore) {
DBUG_PRINT("info", ("Coordinator synchronized with workers, "
"waited entries: %d, cant_sync: %d",
ret, cant_sync));
rli->mts_group_status = Relay_log_info::MTS_NOT_IN_GROUP;
}
return !cant_sync ? ret : -1;
}
bool Mts_submode_database::set_multi_threaded_applier_context(
const Relay_log_info &rli, Log_event &ev) {
// if this is a transaction payload event, we need to set the proper
// databases that its internal events update
if (ev.get_type_code() == binary_log::TRANSACTION_PAYLOAD_EVENT) {
Mts_db_names toset;
bool max_mts_dbs_in_event = false;
std::set<std::string> dbs;
auto &tple = *dynamic_cast<Transaction_payload_log_event *>(&ev);
binlog::Decompressing_event_object_istream istream(
tple, *rli.get_rli_description_event());
std::shared_ptr<Log_event> inner;
while (istream >> inner) {
Mts_db_names mts_dbs;
// This transaction payload event is already marked to run in
// isolation or the event being handled does not contain partition
// information
if (max_mts_dbs_in_event || !inner->contains_partition_info(true))
continue;
// The following queries should run in isolation, thence setting
// OVER_MAX_DBS_IN_EVENT_MTS
if ((inner->get_type_code() == binary_log::QUERY_EVENT)) {
auto *qev = dynamic_cast<Query_log_event *>(inner.get());
if (qev->is_query_prefix_match(STRING_WITH_LEN("XA COMMIT")) ||
qev->is_query_prefix_match(STRING_WITH_LEN("XA ROLLBACK"))) {
max_mts_dbs_in_event = true;
continue;
}
}
// OK, now that we have ruled the exceptions, lets handle the databases
// in the inner event.
inner->get_mts_dbs(&mts_dbs, rli.rpl_filter);
// inner event has mark to run in isolation
if (mts_dbs.num == OVER_MAX_DBS_IN_EVENT_MTS) {
max_mts_dbs_in_event = true;
continue;
}
// iterate over the databases and add them to the set
for (int i = 0; i < mts_dbs.num; i++) {
dbs.insert(mts_dbs.name[i]);
if (dbs.size() == MAX_DBS_IN_EVENT_MTS) {
max_mts_dbs_in_event = true;
break;
}
}
}
if (istream.has_error()) {
LogErr(ERROR_LEVEL, ER_RPL_REPLICA_ERROR_READING_RELAY_LOG_EVENTS,
rli.get_for_channel_str(), istream.get_error_str().c_str());
return true;
}
// now set the database information in the event
if (max_mts_dbs_in_event) {
toset.name[0] = "\0";
toset.num = OVER_MAX_DBS_IN_EVENT_MTS;
} else {
int i = 0;
// set the databases
for (auto &db : dbs) toset.name[i++] = db.c_str();
// set the number of databases
toset.num = dbs.size();
}
// save the mts_dbs to the payload event
tple.set_mts_dbs(toset);
}
return false;
}
/**
Logic to detach the temporary tables from the worker threads upon
event execution.
@param thd THD instance
@param rli Relay_log_info pointer
@param ev Query_log_event that is being applied
*/
void Mts_submode_database::detach_temp_tables(THD *thd,
const Relay_log_info *rli,
Query_log_event *ev) {
DBUG_TRACE;
if (!is_mts_worker(thd)) return;
int parts = ((ev->mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
? 1
: ev->mts_accessed_dbs);
/*
todo: optimize for a case of
a. one db
Only detaching temporary_tables from thd to entry would require
instead of the double-loop below.
b. unchanged thd->temporary_tables.
In such case the involved entries would continue to hold the
unmodified lists provided that the attach_ method does not
destroy references to them.
*/
for (int i = 0; i < parts; i++) {
ev->mts_assigned_partitions[i]->temporary_tables = nullptr;
}
Rpl_filter *rpl_filter = rli->rpl_filter;
for (TABLE *table = thd->temporary_tables; table;) {
int i;
const char *db_name = nullptr;
// find which entry to go
for (i = 0; i < parts; i++) {
db_name = ev->mts_accessed_db_names[i];
if (!strlen(db_name)) break;
// Only default database is rewritten.
if (!rpl_filter->is_rewrite_empty() && !strcmp(ev->get_db(), db_name)) {
size_t dummy_len;
const char *db_filtered =
rpl_filter->get_rewrite_db(db_name, &dummy_len);
// db_name != db_filtered means that db_name is rewritten.
if (strcmp(db_name, db_filtered)) db_name = db_filtered;
}
if (strcmp(table->s->db.str, db_name) < 0)
continue;
else {
// When rewrite db rules are used we can not rely on
// mts_accessed_db_names elements order.
if (!rpl_filter->is_rewrite_empty() &&
strcmp(table->s->db.str, db_name))
continue;
else
break;
}
}
assert(db_name && (!strcmp(table->s->db.str, db_name) || !strlen(db_name)));
assert(i < ev->mts_accessed_dbs);
// table pointer is shifted inside the function
table = mts_move_temp_table_to_entry(table, thd,
ev->mts_assigned_partitions[i]);
}
assert(!thd->temporary_tables);
#ifndef NDEBUG
for (int i = 0; i < parts; i++) {
assert(!ev->mts_assigned_partitions[i]->temporary_tables ||
!ev->mts_assigned_partitions[i]->temporary_tables->prev);
}
#endif
}
/**
Logic to get least occupied worker when the sql mts_submode= database
@param ws array of worker threads
@return slave worker thread
*/
Slave_worker *Mts_submode_database::get_least_occupied_worker(
Relay_log_info *, Slave_worker_array *ws, Log_event *) {
long usage = LONG_MAX;
Slave_worker **ptr_current_worker = nullptr, *worker = nullptr;
DBUG_TRACE;
#ifndef NDEBUG
if (DBUG_EVALUATE_IF("mta_distribute_round_robin", 1, 0)) {
worker = ws->at(w_rr % ws->size());
LogErr(INFORMATION_LEVEL, ER_RPL_WORKER_ID_IS, worker->id,
static_cast<ulong>(w_rr % ws->size()));
assert(worker != nullptr);
return worker;
}
#endif
for (Slave_worker **it = ws->begin(); it != ws->end(); ++it) {
ptr_current_worker = it;
if ((*ptr_current_worker)->usage_partition <= usage) {
worker = *ptr_current_worker;
usage = (*ptr_current_worker)->usage_partition;
}
}
assert(worker != nullptr);
return worker;
}
/* MTS submode master Default constructor */
Mts_submode_logical_clock::Mts_submode_logical_clock() {
type = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
first_event = true;
force_new_group = false;
is_new_group = true;
delegated_jobs = 0;
jobs_done = 0;
last_lwm_timestamp = SEQ_UNINIT;
last_lwm_index = INDEX_UNDEF;
is_error = false;
min_waited_timestamp = SEQ_UNINIT;
last_committed = SEQ_UNINIT;
sequence_number = SEQ_UNINIT;
}
/**
The method finds the minimum logical timestamp (low-water-mark) of
committed transactions.
The successful search results in a pair of a logical timestamp value and a
GAQ index that contains it. last_lwm_timestamp may still be raised though the
search does not find any satisfying running index. Search is implemented as
headway scanning of GAQ from a point of a previous search's stop position
(last_lwm_index). Whether the cached (memorized) index value is considered to
be stale when its timestamp gets less than the current "stable" LWM:
last_lwm_timestamp <= GAQ.lwm.sequence_number (*)
Staleness is caused by GAQ garbage collection that increments the rhs of (*),
see @c move_queue_head(). When that's diagnosed, the search in GAQ needs
restarting from the queue tail.
Formally, the undefined cached value of last_lwm_timestamp is also stale.
@verbatim
the last time index containing lwm
+------+
| LWM |
| | |
V V V
GAQ: xoooooxxxxxXXXXX...X
^ ^
| | LWM+1
|
+- tne new current_lwm
<---- logical (commit) time ----
@endverbatim
here `x' stands for committed, `X' for committed and discarded from
the running range of the queue, `o' for not committed.
@param rli Relay_log_info pointer
@param need_lock Either the caller or the function must hold a mutex
to avoid race with concurrent GAQ update.
@return possibly updated current_lwm
*/
longlong Mts_submode_logical_clock::get_lwm_timestamp(Relay_log_info *rli,
bool need_lock) {
longlong lwm_estim;
Slave_job_group *ptr_g = nullptr;
bool is_stale = false;
if (!need_lock) mysql_mutex_lock(&rli->mts_gaq_LOCK);
/*
Make the "stable" LWM-based estimate which will be compared
against the cached "instant" value.
*/
lwm_estim = rli->gaq->lwm.sequence_number;
/*
timestamp continuity invariant: if the queue has any item
its timestamp is greater on one than the estimate.
*/
assert(lwm_estim == SEQ_UNINIT || rli->gaq->empty() ||
lwm_estim + 1 ==
rli->gaq->get_job_group(rli->gaq->entry)->sequence_number);
last_lwm_index = rli->gaq->find_lwm(
&ptr_g,
/*
The undefined "stable" forces the scan's restart
as the stale value does.
*/
lwm_estim == SEQ_UNINIT ||
(is_stale = clock_leq(last_lwm_timestamp, lwm_estim))
? rli->gaq->entry
: last_lwm_index);
/*
if the returned index is sane update the timestamp.
*/
if (last_lwm_index != rli->gaq->capacity) {
// non-decreasing lwm invariant
assert(clock_leq(last_lwm_timestamp, ptr_g->sequence_number));
last_lwm_timestamp = ptr_g->sequence_number;
} else if (is_stale) {
last_lwm_timestamp.store(lwm_estim);
}
if (!need_lock) mysql_mutex_unlock(&rli->mts_gaq_LOCK);
return last_lwm_timestamp;
}
/**
The method implements logical timestamp conflict detection
and resolution through waiting by the calling thread.
The conflict or waiting condition is like the following
lwm < last_committed,
where lwm is a minimum logical timestamp of committed transactions.
Since the lwm's exact value is not always available its pessimistic
estimate (an old version) is improved (get_lwm_timestamp()) as the
first step before to the actual waiting commitment.
Special cases include:
When @c last_committed_arg is uninitialized the calling thread must
proceed without waiting for anyone. Any possible dependency with unknown
commit parent transaction shall be sorted out by the parent;
When the gaq index is subsequent to the last lwm index
there's no dependency of the current transaction with any regardless of
lwm timestamp should it be SEQ_UNINIT.
Consequently when GAQ consists of just one item there's none to wait.
Such latter case is left to the caller to handle.
@note The caller must make sure the current transaction won't be waiting
for itself. That is the method should not be caller by a Worker
whose group assignment is in the GAQ front item.
@param rli relay log info of coordinator
@param last_committed_arg logical timestamp of a parent transaction
@return false as success,
true when the error flag is raised or
the caller thread is found killed.
*/
bool Mts_submode_logical_clock::wait_for_last_committed_trx(
Relay_log_info *rli, longlong last_committed_arg) {
THD *thd = rli->info_thd;
DBUG_TRACE;
if (last_committed_arg == SEQ_UNINIT) return false;
mysql_mutex_lock(&rli->mts_gaq_LOCK);
assert(min_waited_timestamp == SEQ_UNINIT);
min_waited_timestamp.store(last_committed_arg);
/*
This transaction is a candidate for insertion into the waiting list.
That fact is described by incrementing waited_timestamp_cnt.
When the candidate won't make it the counter is decremented at once
while the mutex is hold.
*/
if ((!rli->info_thd->killed && !is_error) &&
!clock_leq(last_committed_arg, get_lwm_timestamp(rli, true))) {
PSI_stage_info old_stage;
struct timespec ts[2];
set_timespec_nsec(&ts[0], 0);
assert(rli->gaq->get_length() >= 2); // there's someone to wait
thd->ENTER_COND(&rli->logical_clock_cond, &rli->mts_gaq_LOCK,
&stage_worker_waiting_for_commit_parent, &old_stage);
do {
mysql_cond_wait(&rli->logical_clock_cond, &rli->mts_gaq_LOCK);
} while ((!rli->info_thd->killed && !is_error) &&
!clock_leq(last_committed_arg, estimate_lwm_timestamp()));
min_waited_timestamp.store(SEQ_UNINIT); // reset waiting flag
mysql_mutex_unlock(&rli->mts_gaq_LOCK);
thd->EXIT_COND(&old_stage);
set_timespec_nsec(&ts[1], 0);
rli->mts_total_wait_overlap += diff_timespec(&ts[1], &ts[0]);
} else {
min_waited_timestamp.store(SEQ_UNINIT);
mysql_mutex_unlock(&rli->mts_gaq_LOCK);
}
return rli->info_thd->killed || is_error;
}
/**
Does necessary arrangement before scheduling next event.
The method computes the meta-group status of the being scheduled
transaction represented by the event argument. When the status
is found OUT (of the current meta-group) as encoded as is_new_group == true
the global Scheduler (Coordinator thread) requests full synchronization
with all Workers.
The current being assigned group descriptor gets associated with
the group's logical timestamp aka sequence_number.
@return ER_MTA_CANT_PARALLEL, ER_MTA_INCONSISTENT_DATA
0 if no error or slave has been killed gracefully
*/
int Mts_submode_logical_clock::schedule_next_event(Relay_log_info *rli,
Log_event *ev) {
longlong last_sequence_number = sequence_number;
bool gap_successor = false;
static_assert(SEQ_UNINIT == 0, "MTA scheduling code assumes SEQ_UNINIT is 0");
DBUG_TRACE;
// We should check if the SQL thread was already killed before we schedule
// the next transaction
if (sql_slave_killed(rli->info_thd, rli)) return 0;
Slave_job_group *ptr_group =
rli->gaq->get_job_group(rli->gaq->assigned_group_index);
/*
A group id updater must satisfy the following:
- A query log event ("BEGIN" ) or a GTID EVENT
- A DDL or an implicit DML commit.
*/
switch (ev->get_type_code()) {
case binary_log::GTID_LOG_EVENT:
case binary_log::ANONYMOUS_GTID_LOG_EVENT:
// TODO: control continuity
ptr_group->sequence_number = sequence_number =
static_cast<Gtid_log_event *>(ev)->sequence_number;
ptr_group->last_committed = last_committed =
static_cast<Gtid_log_event *>(ev)->last_committed;
break;
default:
sequence_number = last_committed = SEQ_UNINIT;
break;
}
DBUG_PRINT("info", ("sequence_number %lld, last_committed %lld",
sequence_number, last_committed));
if (first_event) {
first_event = false;
} else {
if (unlikely(clock_leq(sequence_number, last_committed) &&
last_committed != SEQ_UNINIT)) {
/* inconsistent (buggy) timestamps */
LogErr(ERROR_LEVEL, ER_RPL_INCONSISTENT_TIMESTAMPS_IN_TRX,
sequence_number, last_committed);
return ER_MTA_CANT_PARALLEL;
}
if (unlikely(clock_leq(sequence_number, last_sequence_number) &&
sequence_number != SEQ_UNINIT)) {
/* inconsistent (buggy) timestamps */
LogErr(ERROR_LEVEL, ER_RPL_INCONSISTENT_SEQUENCE_NO_IN_TRX,
sequence_number, last_sequence_number);
return ER_MTA_CANT_PARALLEL;
}
}
/*
Transaction sequence as scheduled may have gaps, even in
relay log. In such case a transaction that succeeds a gap will
wait for all earlier that were scheduled to finish. It's marked
as gap successor now.
*/
if (unlikely(sequence_number > last_sequence_number + 1)) {
DBUG_PRINT("info", ("sequence_number gap found, "
"last_sequence_number %lld, sequence_number %lld",
last_sequence_number, sequence_number));
gap_successor = true;
}
/*
The new group flag is practically the same as the force flag
when up to indicate synchronization with Workers.
*/
is_new_group =
(/* First event after a submode switch; */
first_event ||
/* Require a fresh group to be started; */
// todo: turn `force_new_group' into sequence_number == SEQ_UNINIT
// condition
force_new_group ||
/* Rewritten event without commit point timestamp (todo: find use case)
*/
sequence_number == SEQ_UNINIT ||
/*
undefined parent (e.g the very first trans from the master),
or old master.
*/
last_committed == SEQ_UNINIT ||
/*
When gap successor depends on a gap before it the scheduler has
to serialize this transaction execution with previously
scheduled ones. Below for simplicity it's assumed that such
gap-dependency is always the case.
*/
gap_successor ||
/*
previous group did not have sequence number assigned.
It's execution must be finished until the current group
can be assigned.
Dependency of the current group on the previous
can't be tracked. So let's wait till the former is over.
*/
last_sequence_number == SEQ_UNINIT);
/*
The coordinator waits till all transactions on which the current one
depends on are applied.
*/
if (!is_new_group) {
longlong lwm_estimate = estimate_lwm_timestamp();
if (!clock_leq(last_committed, lwm_estimate) &&
rli->gaq->assigned_group_index != rli->gaq->entry) {
/*
"Unlikely" branch.
The following block improves possibly stale lwm and when the
waiting condition stays, recompute min_waited_timestamp and go
waiting.
At awakening set min_waited_timestamp to commit_parent in the
subsequent GAQ index (could be NIL).
*/
if (wait_for_last_committed_trx(rli, last_committed)) {
/*
MTS was waiting for a dependent transaction to finish but either it
has failed or the applier was requested to stop. In any case, this
transaction wasn't started yet and should not warn about the
coordinator stopping in a middle of a transaction to avoid polluting
the server error log.
*/
rli->reported_unsafe_warning = true;
return -1;
}
/*
Making the slave's max last committed (lwm) to satisfy this
transaction's scheduling condition.
*/
if (gap_successor) last_lwm_timestamp = sequence_number - 1;
assert(!clock_leq(sequence_number, estimate_lwm_timestamp()));
}
delegated_jobs++;
assert(!force_new_group);
} else {
assert(delegated_jobs >= jobs_done);
assert(is_error ||
(rli->gaq->get_length() + jobs_done == 1 + delegated_jobs));
assert(rli->mts_group_status == Relay_log_info::MTS_IN_GROUP);
/*
Under the new group fall the following use cases:
- events from an OLD (sequence_number unaware) master;
- malformed (missed BEGIN or GTID_NEXT) group incl. its
particular form of CREATE..SELECT..from..@user_var (or rand- and
int- var in place of @user- var).
The malformed group is handled exceptionally each event is executed
as a solitary group yet by the same (zero id) worker.
*/
if (-1 == wait_for_workers_to_finish(rli)) return ER_MTA_INCONSISTENT_DATA;
rli->mts_group_status = Relay_log_info::MTS_IN_GROUP; // wait set it to NOT
assert(min_waited_timestamp == SEQ_UNINIT);
/*
the instant last lwm timestamp must reset when force flag is up.
*/
rli->gaq->lwm.sequence_number = last_lwm_timestamp = SEQ_UNINIT;
delegated_jobs = 1;
jobs_done = 0;
force_new_group = false;
/*
Not sequenced event can be followed with a logically relating
e.g User var to be followed by CREATE table.
It's supported to be executed in one-by-one fashion.
Todo: remove with the event group parser worklog.
*/
if (sequence_number == SEQ_UNINIT && last_committed == SEQ_UNINIT)
rli->last_assigned_worker = *rli->workers.begin();
}
#ifndef NDEBUG
mysql_mutex_lock(&rli->mts_gaq_LOCK);
assert(is_error || (rli->gaq->get_length() + jobs_done == delegated_jobs));
mysql_mutex_unlock(&rli->mts_gaq_LOCK);
#endif
return 0;
}
/**
Logic to attach the temporary tables from the worker threads upon
event execution.
@param thd THD instance
@param rli Relay_log_info instance
@param ev Query_log_event that is being applied
*/
void Mts_submode_logical_clock::attach_temp_tables(THD *thd,
const Relay_log_info *rli,
Query_log_event *ev) {
bool shifted = false;
TABLE *table, *cur_table;
DBUG_TRACE;
if (!is_mts_worker(thd) || (ev->ends_group() || ev->starts_group())) return;
/* fetch coordinator's rli */
Relay_log_info *c_rli = static_cast<const Slave_worker *>(rli)->c_rli;
assert(!thd->temporary_tables);
mysql_mutex_lock(&c_rli->mts_temp_table_LOCK);
if (!(table = c_rli->info_thd->temporary_tables)) {
mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
return;
}
c_rli->info_thd->temporary_tables = nullptr;
do {
/* store the current table */
cur_table = table;
/* move the table pointer to next in list, so that we can isolate the
current table */
table = table->next;
std::pair<uint, my_thread_id> st_id_pair =
get_server_and_thread_id(cur_table);
if (thd->server_id == st_id_pair.first &&
thd->variables.pseudo_thread_id == st_id_pair.second) {
/* short the list singling out the current table */
if (cur_table->prev) // not the first node
cur_table->prev->next = cur_table->next;
if (cur_table->next) // not the last node
cur_table->next->prev = cur_table->prev;
/* isolate the table */
cur_table->prev = nullptr;
cur_table->next = nullptr;
mts_move_temp_tables_to_thd(thd, cur_table);
} else
/* We must shift the C->temp_table pointer to the fist table unused in
this iteration. If all the tables have ben used C->temp_tables will
point to NULL */
if (!shifted) {
c_rli->info_thd->temporary_tables = cur_table;
shifted = true;
}
} while (table);
mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
}
/**
Logic to detach the temporary tables from the worker threads upon
event execution.
@param thd THD instance
@param rli Relay_log_info instance
*/
void Mts_submode_logical_clock::detach_temp_tables(THD *thd,
const Relay_log_info *rli,
Query_log_event *) {
DBUG_TRACE;
if (!is_mts_worker(thd)) return;
/*
Here in detach section we will move the tables from the worker to the
coordinaor thread. Since coordinator is shared we need to make sure that
there are no race conditions which may lead to assert failures and
non-deterministic results.
*/
Relay_log_info *c_rli = static_cast<const Slave_worker *>(rli)->c_rli;
mysql_mutex_lock(&c_rli->mts_temp_table_LOCK);
mts_move_temp_tables_to_thd(c_rli->info_thd, thd->temporary_tables);
mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
thd->temporary_tables = nullptr;
}
/**
Logic to get least occupied worker when the sql mts_submode= master_parallel
@param rli relay log info of coordinator
@param ws array of worker threads
@param ev event for which we are searching for a worker.
@return slave worker thread or NULL when coordinator is killed by any worker.
*/
Slave_worker *Mts_submode_logical_clock::get_least_occupied_worker(
Relay_log_info *rli, Slave_worker_array *ws [[maybe_unused]],
Log_event *ev) {
Slave_worker *worker = nullptr;
PSI_stage_info *old_stage = nullptr;
THD *thd = rli->info_thd;
DBUG_TRACE;
#ifndef NDEBUG
if (DBUG_EVALUATE_IF("mta_distribute_round_robin", 1, 0)) {
worker = ws->at(w_rr % ws->size());
LogErr(INFORMATION_LEVEL, ER_RPL_WORKER_ID_IS, worker->id,
static_cast<ulong>(w_rr % ws->size()));
assert(worker != nullptr);
return worker;
}
Slave_committed_queue *gaq = rli->gaq;
Slave_job_group *ptr_group;
ptr_group = gaq->get_job_group(rli->gaq->assigned_group_index);
#endif
/*
The scheduling works as follows, in this sequence
-If this is an internal event of a transaction use the last assigned
worker
-If the i-th transaction is being scheduled in this group where "i" <=
number of available workers then schedule the events to the consecutive
workers
-If the i-th transaction is being scheduled in this group where "i" >
number of available workers then schedule this to the first worker that
becomes free.
*/
if (rli->last_assigned_worker) {
worker = rli->last_assigned_worker;
assert(ev->get_type_code() != binary_log::USER_VAR_EVENT ||
worker->id == 0 || rli->curr_group_seen_begin ||
rli->curr_group_seen_gtid);
} else {
worker = get_free_worker(rli);
assert(ev->get_type_code() != binary_log::USER_VAR_EVENT ||
rli->curr_group_seen_begin || rli->curr_group_seen_gtid);
if (worker == nullptr) {
struct timespec ts[2];
set_timespec_nsec(&ts[0], 0);
// Update thd info as waiting for workers to finish.
thd->enter_stage(&stage_replica_waiting_for_workers_to_process_queue,
old_stage, __func__, __FILE__, __LINE__);
while (!worker && !thd->killed) {
/*
Busy wait with yielding thread control before to next attempt
to find a free worker. As of current, a worker
can't have more than one assigned group of events in its
queue.
todo: replace this At-Most-One assignment policy with
First Available Worker as
this method clearly can't be considered as optimal.
*/
#if !defined(_WIN32)
sched_yield();
#else
my_sleep(rli->mts_coordinator_basic_nap);
#endif
worker = get_free_worker(rli);
}
THD_STAGE_INFO(thd, *old_stage);
set_timespec_nsec(&ts[1], 0);
rli->mts_total_wait_worker_avail += diff_timespec(&ts[1], &ts[0]);
rli->mts_wq_no_underrun_cnt++;
/*
Even OPTION_BEGIN is set, the 'BEGIN' event is not dispatched to
any worker thread. So The flag is removed and Coordinator thread
will not try to finish the group before abort.
*/
if (worker == nullptr)
rli->info_thd->variables.option_bits &= ~OPTION_BEGIN;
}
if (rli->get_commit_order_manager() != nullptr && worker != nullptr)
rli->get_commit_order_manager()->register_trx(worker);
}
assert(ptr_group);
// assert that we have a worker thread for this event or the slave has
// stopped.
assert(worker != nullptr || thd->killed);
/* The master my have send db partition info. make sure we never use them*/
if (ev->get_type_code() == binary_log::QUERY_EVENT)
static_cast<Query_log_event *>(ev)->mts_accessed_dbs = 0;
return worker;
}
/**
Protected method to fetch a worker having no events assigned.
The method is supposed to be called by Coordinator, therefore
comparison like w_i->jobs.len == 0 must (eventually) succeed.
todo: consider to optimize scan that is getting more expensive with
more # of Workers.
@return a pointer to Worker or NULL if none is free.
*/
Slave_worker *Mts_submode_logical_clock::get_free_worker(Relay_log_info *rli) {
for (Slave_worker **it = rli->workers.begin(); it != rli->workers.end();
++it) {
Slave_worker *w_i = *it;
if (w_i->jobs.get_length() == 0) return w_i;
}
return nullptr;
}
/**
Waits for slave workers to finish off the pending tasks before returning.
Used in this submode to make sure that all assigned jobs have been done.
@param rli coordinator rli.
@param ignore worker to ignore.
@return -1 for error.
0 no error.
*/
int Mts_submode_logical_clock::wait_for_workers_to_finish(
Relay_log_info *rli, [[maybe_unused]] Slave_worker *ignore) {
PSI_stage_info *old_stage = nullptr;
THD *thd = rli->info_thd;
DBUG_TRACE;
DBUG_PRINT("info", ("delegated %d, jobs_done %d", delegated_jobs, jobs_done));
// Update thd info as waiting for workers to finish.
thd->enter_stage(&stage_replica_waiting_for_workers_to_process_queue,
old_stage, __func__, __FILE__, __LINE__);
while (delegated_jobs > jobs_done && !thd->killed && !is_error) {
// Todo: consider to replace with a. GAQ::get_lwm_timestamp() or
// b. (better) pthread wait+signal similarly to DB type.
if (mta_checkpoint_routine(rli, true)) return -1;
}
// Check if there is a failure on a not-ignored Worker
for (Slave_worker **it = rli->workers.begin(); it != rli->workers.end();
++it) {
Slave_worker *w_i = *it;
if (w_i->running_status != Slave_worker::RUNNING) return -1;
}
DBUG_EXECUTE_IF("wait_for_workers_to_finish_after_wait", {
const char act[] = "now WAIT_FOR coordinator_continue";
assert(!debug_sync_set_action(rli->info_thd, STRING_WITH_LEN(act)));
});
// The current commit point sequence may end here (e.g Rotate to new log)
rli->gaq->lwm.sequence_number = SEQ_UNINIT;
// Restore previous info.
THD_STAGE_INFO(thd, *old_stage);
DBUG_PRINT("info", ("delegated %d, jobs_done %d, Workers have finished their"
" jobs",
delegated_jobs, jobs_done));
rli->mts_group_status = Relay_log_info::MTS_NOT_IN_GROUP;
return !thd->killed && !is_error ? 0 : -1;
}
/**
Protected method to fetch the server_id and pseudo_thread_id from a
temporary table
@param table instance pointer of TABLE structure.
@return std:pair<uint, my_thread_id>
@note It is the caller's responsibility to make sure we call this
function only for temp tables.
*/
std::pair<uint, my_thread_id>
Mts_submode_logical_clock::get_server_and_thread_id(TABLE *table) {
DBUG_TRACE;
const char *extra_string = table->s->table_cache_key.str;
size_t extra_string_len = table->s->table_cache_key.length;
// assert will fail when called with non temporary tables.
assert(table->s->table_cache_key.length > 0);
std::pair<uint, my_thread_id> ret_pair = std::make_pair(
/* last 8 bytes contains the server_id + pseudo_thread_id */
// fetch first 4 bytes to get the server id.
uint4korr(extra_string + extra_string_len - 8),
/* next 4 bytes contains the pseudo_thread_id */
uint4korr(extra_string + extra_string_len - 4));
return ret_pair;
}
|