1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434
|
/*
Copyright (c) 2023 Kristian Nielsen <knielsen@knielsen-hq.org>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
*/
#include "gtid_index.h"
#include "sql_const.h"
#include "log.h"
static const uchar GTID_INDEX_MAGIC[4]= {
254, 254, 12, 1,
};
Gtid_index_writer *Gtid_index_writer::hot_index_list= nullptr;
/* gtid_index_mutex is inited in MYSQL_LOG::init_pthread_objects(). */
mysql_mutex_t Gtid_index_writer::gtid_index_mutex;
Gtid_index_writer::Gtid_index_writer(const char *filename, uint32 offset,
rpl_binlog_state_base *binlog_state,
uint32 opt_page_size,
my_off_t opt_span_min)
: offset_min_threshold(opt_span_min),
nodes(nullptr), previous_offset(0),
max_level(0), index_file(-1),
error_state(false), file_header_written(false), in_hot_index_list(false)
{
uint32 count;
rpl_gtid *gtid_list;
page_size= opt_page_size;
pending_state.init();
if (alloc_level_if_missing(0))
{
give_error("Out of memory allocating node list");
return;
}
/*
Lock the index mutex at this point just before we create the new index
file on disk. From this point on, and until the index is fully written,
the reader will find us in the "hot index" list and will be able to read
from the index while it's still being constructed.
*/
lock_gtid_index();
build_index_filename(filename);
int create_flags= O_WRONLY|O_TRUNC|O_BINARY|O_EXCL;
index_file= mysql_file_create(key_file_gtid_index, index_file_name,
CREATE_MODE, create_flags, MYF(0));
if (index_file < 0 && my_errno == EEXIST)
{
/*
It shouldn't happen that an old GTID index file remains, as we remove
them as part of RESET MASTER and PURGE BINARY LOGS. But if it happens
due to some external file copy of the user or something, delete any old
GTID index file first.
*/
sql_print_information("Old GTID index file found '%s', deleting",
index_file_name);
my_errno= 0;
mysql_file_delete(key_file_gtid_index, index_file_name, MYF(0));
index_file= mysql_file_create(key_file_gtid_index, index_file_name,
CREATE_MODE, create_flags, MYF(0));
}
if (index_file < 0)
{
give_error("Failed to open new index file for writing");
goto err;
}
/*
Write out an initial index record, i.e. corresponding to the GTID_LIST
event / binlog state at the start of the binlog file.
*/
count= binlog_state->count_nolock();
gtid_list= gtid_list_buffer(count);
if (count > 0)
{
if (!gtid_list)
goto err;
binlog_state->get_gtid_list_nolock(gtid_list, count);
}
write_record(offset, gtid_list, count);
insert_in_hot_index();
err:
unlock_gtid_index();
}
Gtid_index_writer::~Gtid_index_writer()
{
if (in_hot_index_list)
{
lock_gtid_index();
close();
unlock_gtid_index();
}
if (index_file > 0)
{
/*
Should have been closed by call to Gtid_index_writer::close().
We can at least avoid leaking file descriptor.
*/
mysql_file_close(index_file, MYF(0));
}
if (nodes)
{
for (uint32 i= 0; i <= max_level; ++i)
delete nodes[i];
my_free(nodes);
}
/*
state.free() is not needed here, will be called from rpl_binlog_state_base
destructor.
*/
}
void
Gtid_index_writer::gtid_index_init()
{
mysql_mutex_init(key_gtid_index_lock, >id_index_mutex, MY_MUTEX_INIT_SLOW);
}
void
Gtid_index_writer::gtid_index_cleanup()
{
mysql_mutex_destroy(>id_index_mutex);
}
const Gtid_index_writer *
Gtid_index_writer::find_hot_index(const char *file_name)
{
mysql_mutex_assert_owner(>id_index_mutex);
for (const Gtid_index_writer *p= hot_index_list; p; p= p->next_hot_index)
{
if (0 == strcmp(file_name, p->index_file_name))
return p;
}
return nullptr;
}
void
Gtid_index_writer::insert_in_hot_index()
{
mysql_mutex_assert_owner(>id_index_mutex);
next_hot_index= hot_index_list;
hot_index_list= this;
in_hot_index_list= true;
}
void
Gtid_index_writer::remove_from_hot_index()
{
mysql_mutex_assert_owner(>id_index_mutex);
Gtid_index_writer **next_ptr_ptr= &hot_index_list;
for (;;)
{
Gtid_index_writer *p= *next_ptr_ptr;
if (!p)
break;
if (p == this)
{
*next_ptr_ptr= p->next_hot_index;
break;
}
next_ptr_ptr= &p->next_hot_index;
}
next_hot_index= nullptr;
in_hot_index_list= false;
}
void
Gtid_index_writer::process_gtid(uint32 offset, const rpl_gtid *gtid)
{
rpl_gtid *gtid_list;
uint32 gtid_count;
if (process_gtid_check_batch(offset, gtid, >id_list, >id_count))
return; // Error
if (gtid_list)
async_update(offset, gtid_list, gtid_count);
}
int
Gtid_index_writer::process_gtid_check_batch(uint32 offset, const rpl_gtid *gtid,
rpl_gtid **out_gtid_list,
uint32 *out_gtid_count)
{
uint32 count;
rpl_gtid *gtid_list;
mysql_mutex_assert_not_owner(>id_index_mutex);
if (unlikely(pending_state.update_nolock(gtid)))
{
give_error("Out of memory processing GTID for binlog GTID index");
return 1;
}
/*
Sparse index; we record only selected GTIDs, and scan the binlog forward
from there to find the exact spot.
*/
if (offset - previous_offset < offset_min_threshold)
{
*out_gtid_list= nullptr;
*out_gtid_count= 0;
return 0;
}
count= pending_state.count_nolock();
DBUG_ASSERT(count > 0 /* Since we just updated with a GTID. */);
gtid_list= (rpl_gtid *)
my_malloc(key_memory_binlog_gtid_index, count*sizeof(*gtid_list), MYF(0));
if (unlikely(!gtid_list))
{
give_error("Out of memory allocating GTID list for binlog GTID index");
return 1;
}
if (unlikely(pending_state.get_gtid_list_nolock(gtid_list, count)))
{
/* Shouldn't happen as we allocated the list with the correct length. */
DBUG_ASSERT(false);
give_error("Internal error allocating GTID list for binlog GTID index");
my_free(gtid_list);
return 1;
}
pending_state.reset_nolock();
previous_offset= offset;
*out_gtid_list= gtid_list;
*out_gtid_count= count;
return 0;
}
int
Gtid_index_writer::async_update(uint32 event_offset,
rpl_gtid *gtid_list,
uint32 gtid_count)
{
lock_gtid_index();
int res= write_record(event_offset, gtid_list, gtid_count);
unlock_gtid_index();
my_free(gtid_list);
return res;
}
void
Gtid_index_writer::close()
{
lock_gtid_index();
if (!error_state)
{
/*
Write out the remaining pending pages, and insert the final child pointer
in interior nodes.
*/
for (uint32 level= 0; ; ++level)
{
uint32 node_ptr= write_current_node(level, level==max_level);
nodes[level]->reset();
if (!node_ptr || level >= max_level)
break;
add_child_ptr(level+1, node_ptr);
}
}
remove_from_hot_index();
unlock_gtid_index();
if (!error_state)
{
if (mysql_file_sync(index_file, MYF(0)))
give_error("Error syncing index file to disk");
}
mysql_file_close(index_file, MYF(0));
index_file= (File)-1;
}
Gtid_index_base::Index_node_base::Index_node_base()
: first_page(nullptr), current_page(nullptr), current_ptr(nullptr)
{
}
Gtid_index_base::Index_node_base::~Index_node_base()
{
free_pages();
}
void
Gtid_index_base::Index_node_base::free_pages()
{
for (Node_page *p= first_page; p; )
{
Node_page *q= p->next;
my_free(p);
p= q;
}
}
void
Gtid_index_base::Index_node_base::reset()
{
free_pages();
first_page= current_page= nullptr;
}
Gtid_index_base::Gtid_index_base()
: gtid_buffer(nullptr), gtid_buffer_alloc(0)
{
}
Gtid_index_base::~Gtid_index_base()
{
if (gtid_buffer_alloc > 0)
my_free(gtid_buffer);
}
void
Gtid_index_base::make_gtid_index_file_name(char *out_name, size_t bufsize,
const char *base_filename)
{
char *p= strmake(out_name, base_filename, bufsize-1);
size_t remain= bufsize - (p - out_name);
strmake(p, ".idx", remain-1);
}
void
Gtid_index_base::build_index_filename(const char *filename)
{
make_gtid_index_file_name(index_file_name, sizeof(index_file_name), filename);
}
rpl_gtid *
Gtid_index_base::gtid_list_buffer(uint32 count)
{
if (gtid_buffer_alloc >= count)
return gtid_buffer;
rpl_gtid *new_buffer= (rpl_gtid *)
my_malloc(key_memory_binlog_gtid_index, count*sizeof(*new_buffer), MYF(0));
if (!new_buffer)
{
give_error("Out of memory allocating buffer for GTID list");
return NULL;
}
my_free(gtid_buffer);
gtid_buffer= new_buffer;
gtid_buffer_alloc= count;
return new_buffer;
}
Gtid_index_writer::Index_node::Index_node(uint32 level_)
: num_records(0), level(level_), force_spill_page(false)
{
state.init();
}
Gtid_index_writer::Index_node::~Index_node()
{
free_pages();
}
uint32
Gtid_index_writer::write_current_node(uint32 level, bool is_root)
{
Index_node *n= nodes[level];
uint32 node_pos= (uint32)mysql_file_tell(index_file, MYF(0));
for (Node_page *p= n->first_page; p ; p= p->next)
{
if (unlikely(is_root))
*(p->flag_ptr) |= PAGE_FLAG_ROOT;
if (likely(!p->next))
*(p->flag_ptr) |= PAGE_FLAG_LAST;
int4store(p->page + page_size - CHECKSUM_LEN,
my_checksum(0, p->page, page_size - CHECKSUM_LEN));
if (mysql_file_write(index_file, p->page, page_size, MYF(MY_NABP)))
{
give_error("Error writing index page");
return 0;
}
}
DBUG_ASSERT(node_pos % page_size == 0);
/* Page numbers are +1 just so that zero can denote invalid page pointer. */
return 1 + (node_pos / (uint32)page_size);
}
void
Gtid_index_writer::Index_node::reset()
{
Index_node_base::reset();
state.reset_nolock();
num_records= 0;
force_spill_page= false;
}
/*
Make sure there is requested space in the current page, by allocating a
new spill page if necessary.
*/
int
Gtid_index_writer::reserve_space(Index_node *n, size_t bytes)
{
DBUG_ASSERT(bytes <= page_size);
if (likely(n->current_page) &&
likely(n->current_ptr - n->current_page->page + bytes <=
(page_size - CHECKSUM_LEN)))
return 0;
/* Not enough room, allocate a spill page. */
Node_page *page= alloc_page();
n->force_spill_page= false;
if (!page)
return 1;
n->current_ptr=
init_header(page, n->level==0, !n->current_page);
if (n->current_page)
n->current_page->next= page;
else
n->first_page= page;
n->current_page= page;
return 0;
}
int
Gtid_index_writer::do_write_record(uint32 level,
uint32 event_offset,
const rpl_gtid *gtid_list,
uint32 gtid_count)
{
DBUG_ASSERT(level <= max_level);
Index_node *n= nodes[level];
if (reserve_space(n, 8))
return 1;
/* Store the count as +1, so that 0 can mean "no more records". */
int4store(n->current_ptr, gtid_count+1);
int4store(n->current_ptr+4, event_offset);
n->current_ptr+= 8;
for (uint32 i= 0; i < gtid_count; ++i)
{
if (reserve_space(n, 16))
return 1;
int4store(n->current_ptr, gtid_list[i].domain_id);
int4store(n->current_ptr+4, gtid_list[i].server_id);
int8store(n->current_ptr+8, gtid_list[i].seq_no);
n->current_ptr+= 16;
}
++n->num_records;
return 0;
}
/*
Add a child pointer to the current node on LEVEL.
The first page has node_ptr=1 just so that a zero node_ptr can be used as
a no/invalid value (effectively node_ptr points to the end of the target
page, in unit of pages).
Adding a child pointer shouldn't spill to a new page, code must make sure that
there is always room for the final child pointer in current non-leaf node.
*/
int
Gtid_index_writer::add_child_ptr(uint32 level, my_off_t node_offset)
{
DBUG_ASSERT(level <= max_level);
DBUG_ASSERT(node_offset > 0);
Index_node *n= nodes[level];
if (reserve_space(n, 4))
return 1;
DBUG_ASSERT(n->current_page);
DBUG_ASSERT((size_t)(n->current_ptr - n->current_page->page + 4) <=
page_size - CHECKSUM_LEN);
int4store(n->current_ptr, node_offset);
n->current_ptr+= 4;
return 0;
}
/*
Write one index record to the GTID index, flushing nodes and allocating
new nodes as necessary.
*/
int
Gtid_index_writer::write_record(uint32 event_offset,
const rpl_gtid *gtid_list,
uint32 gtid_count)
{
if (error_state)
return 1; /* Avoid continuing on a possibly corrupt state. */
uint32 level= 0;
/*
The most frequent case is when there is room in the current page for the
current position to be written, in which case we exit early in the first
iteration of the following loop.
In the general case, we move up through the path to the root, writing
lower-level node page to disk and adding child pointers in higher-level
nodes, until we reach a node that has room. This final node may be a
freshly allocated new root node in the few times when the height of the
tree increases.
*/
for (;;)
{
Index_node *n= nodes[level];
if (update_gtid_state(&n->state, gtid_list, gtid_count))
return give_error("Out of memory updating the local GTID state");
if (check_room(level, gtid_count))
{
/* There is room in the node, just add the index record. */
return do_write_record(level, event_offset, gtid_list, gtid_count);
}
/*
This node is full:
- First, write out this node to disk.
- Add a child pointer in the parent node (allocating one if needed).
- On level 0, allocate a new leaf node and add the index record there.
- On levels >0, skip the last index record when the node gets full
(B+-Tree has (k-1) keys for k child pointers).
- Loop to the parent node to add an index record there.
*/
uint32 node_ptr= write_current_node(level, false);
if (!node_ptr)
return 1;
if (alloc_level_if_missing(level+1) ||
add_child_ptr(level+1, node_ptr))
return 1;
uint32 new_count= n->state.count_nolock();
rpl_gtid *new_gtid_list= gtid_list_buffer(new_count);
if (new_count > 0 && !new_gtid_list)
return 1;
if (n->state.get_gtid_list_nolock(new_gtid_list, new_count))
return give_error("Internal error processing GTID state");
n->reset();
if (level == 0)
{
if (do_write_record(level, event_offset, new_gtid_list, new_count))
return 1;
}
else
{
/*
Allocate a page for the node. This is mostly to help the reader of hot
index to not see NULL pointers, and we will need the page later anyway
to put at least one child pointer to the level below.
*/
if (reserve_space(n, 4))
return 1;
}
gtid_list= new_gtid_list;
gtid_count= new_count;
++level;
}
// NotReached.
}
bool
Gtid_index_writer::check_room(uint32 level, uint32 gtid_count)
{
Index_node *n= nodes[level];
/* There's always room in an empty (to-be-allocated) page. */
if (!n->current_page || n->num_records == 0)
return true;
/*
Make sure we use at least 1/2 a page of room after the initial record,
setting a flag to allocate a spill page later if needed.
*/
size_t avail= page_size - CHECKSUM_LEN - (n->current_ptr - n->current_page->page);
if (n->num_records==1 && avail < page_size/2)
{
n->force_spill_page= true;
return true;
}
if (n->force_spill_page)
return true;
size_t needed= 8 + 16*gtid_count;
/* Non-leaf pages need extra 4 bytes for a child pointer. */
if (level > 0)
needed+= 4;
return needed <= avail;
}
int
Gtid_index_writer::alloc_level_if_missing(uint32 level)
{
if (likely(nodes))
{
if (likely(max_level >= level))
return 0;
DBUG_ASSERT(level == max_level+1); // Alloc one at a time
}
Index_node *node= new Index_node(level);
if (!node)
return give_error("Out of memory allocating new node");
Index_node **new_nodes= (Index_node **)
my_realloc(key_memory_binlog_gtid_index, nodes, (level+1)*sizeof(*nodes),
MYF(MY_ALLOW_ZERO_PTR|MY_ZEROFILL));
if (!new_nodes)
{
delete node;
return give_error("Out of memory allocating larger node list");
}
new_nodes[level]= node;
nodes= new_nodes;
max_level= level;
return 0;
}
/*
Initialize the start of a data page.
This is at the start of a page, except for the very first page where it
comes after the global file header.
Format:
0 flags.
1-3 unused padding/reserved.
The argument FIRST denotes if this is the first page (if false it is a
continuation page).
*/
uchar *
Gtid_index_writer::init_header(Node_page *page, bool is_leaf, bool is_first)
{
uchar *p= page->page;
bool is_file_header= !file_header_written;
if (unlikely(is_file_header))
{
memcpy(p, GTID_INDEX_MAGIC, sizeof(GTID_INDEX_MAGIC));
p+= sizeof(GTID_INDEX_MAGIC);
*p++= GTID_INDEX_VERSION_MAJOR;
*p++= GTID_INDEX_VERSION_MINOR;
/* Flags/padding currently unused. */
*p++= 0;
*p++= 0;
int4store(p, page_size);
p+= 4;
DBUG_ASSERT(p == page->page + GTID_INDEX_FILE_HEADER_SIZE);
file_header_written= true;
}
uchar flags= 0;
if (is_leaf)
flags|= PAGE_FLAG_IS_LEAF;
if (unlikely(!is_first))
flags|= PAGE_FLAG_IS_CONT;
page->flag_ptr= p;
*p++= flags;
/* Padding/reserved. */
p+= 3;
DBUG_ASSERT(p == page->page +
(is_file_header ? GTID_INDEX_FILE_HEADER_SIZE : 0) +
GTID_INDEX_PAGE_HEADER_SIZE);
DBUG_ASSERT((size_t)(p - page->page) < page_size - CHECKSUM_LEN);
return p;
}
int
Gtid_index_base::update_gtid_state(rpl_binlog_state_base *state,
const rpl_gtid *gtid_list, uint32 gtid_count)
{
for (uint32 i= 0; i < gtid_count; ++i)
if (state->update_nolock(>id_list[i]))
return 1;
return 0;
}
Gtid_index_base::Node_page *Gtid_index_base::alloc_page()
{
Node_page *new_node= (Node_page *)
my_malloc(key_memory_binlog_gtid_index,
sizeof(Node_page) + page_size,
MYF(MY_ZEROFILL));
if (!new_node)
give_error("Out of memory for allocating index page");
return new_node;
}
int Gtid_index_writer::give_error(const char *msg)
{
if (!error_state)
{
sql_print_information("Error during binlog GTID index creation, will "
"fallback to slower sequential binlog scan. "
"Error is: %s", msg);
error_state= true;
}
return 1;
}
Gtid_index_reader::Gtid_index_reader()
: n(nullptr), index_file(-1),
file_open(false), index_valid(false), has_root_node(false),
version_major(0), version_minor(0)
{
current_state.init();
compare_state.init();
}
Gtid_index_reader::~Gtid_index_reader()
{
if (file_open)
mysql_file_close(index_file, MYF(0));
}
int
Gtid_index_reader::search_offset(uint32 in_offset,
uint32 *out_offset, uint32 *out_gtid_count)
{
in_search_offset= in_offset;
search_cmp_function= &Gtid_index_reader::search_cmp_offset;
return do_index_search(out_offset, out_gtid_count);
}
int
Gtid_index_reader::search_gtid_pos(slave_connection_state *in_gtid_pos,
uint32 *out_offset, uint32 *out_gtid_count)
{
in_search_gtid_pos= in_gtid_pos;
search_cmp_function= &Gtid_index_reader::search_cmp_gtid_pos;
int res= do_index_search(out_offset, out_gtid_count);
/* Let's not leave a dangling pointer to the caller's memory. */
in_search_gtid_pos= nullptr;
return res;
}
rpl_gtid *
Gtid_index_reader::search_gtid_list()
{
return gtid_buffer;
}
int
Gtid_index_reader::search_cmp_offset(uint32 offset,
rpl_binlog_state_base *state)
{
if (offset <= in_search_offset)
return 0;
else
return -1;
}
int
Gtid_index_reader::search_cmp_gtid_pos(uint32 offset,
rpl_binlog_state_base *state)
{
if (state->is_before_pos(in_search_gtid_pos))
return 0;
else
return -1;
}
int
Gtid_index_reader::next_page()
{
if (!read_page->next)
return 1;
read_page= read_page->next;
read_ptr= read_page->flag_ptr + 4;
return 0;
}
int
Gtid_index_reader::find_bytes(uint32 num_bytes)
{
if ((my_ptrdiff_t)(read_ptr - read_page->page + num_bytes) <=
(my_ptrdiff_t)(page_size - CHECKSUM_LEN))
return 0;
return next_page();
}
int
Gtid_index_reader::get_child_ptr(uint32 *out_child_ptr)
{
if (find_bytes(4))
return give_error("Corrupt index, short index node");
*out_child_ptr= (uint32)uint4korr(read_ptr);
read_ptr+= 4;
return 0;
}
/*
Read the start of an index record (count of GTIDs in the differential state
and offset).
Returns:
0 ok
1 EOF, no more data in this node
*/
int
Gtid_index_reader::get_offset_count(uint32 *out_offset, uint32 *out_gtid_count)
{
if (find_bytes(8))
return 1;
uint32 gtid_count= uint4korr(read_ptr);
if (gtid_count == 0)
{
/* 0 means invalid/no record (we store N+1 for N GTIDs in record). */
return 1;
}
*out_gtid_count= gtid_count - 1;
*out_offset= uint4korr(read_ptr + 4);
read_ptr+= 8;
return 0;
}
int
Gtid_index_reader::get_gtid_list(rpl_gtid *out_gtid_list, uint32 count)
{
for (uint32 i= 0; i < count; ++i)
{
if (find_bytes(16))
return give_error("Corrupt index, short index node");
out_gtid_list[i].domain_id= uint4korr(read_ptr);
out_gtid_list[i].server_id= uint4korr(read_ptr + 4);
out_gtid_list[i].seq_no= uint8korr(read_ptr + 8);
read_ptr+= 16;
}
return 0;
}
int
Gtid_index_reader::open_index_file(const char *binlog_filename)
{
close_index_file();
build_index_filename(binlog_filename);
if ((index_file= mysql_file_open(key_file_gtid_index, index_file_name,
O_RDONLY|O_BINARY, MYF(0))) < 0)
return 1; // No error for missing index (eg. upgrade)
file_open= true;
if (read_file_header())
return 1;
return 0;
}
void
Gtid_index_reader::close_index_file()
{
if (!file_open)
return;
mysql_file_close(index_file, MYF(0));
file_open= false;
index_valid= false;
}
int
Gtid_index_reader::do_index_search(uint32 *out_offset, uint32 *out_gtid_count)
{
/* In cold index, we require a complete index with a valid root node. */
if (!has_root_node)
return -1;
return do_index_search_root(out_offset, out_gtid_count);
}
int
Gtid_index_reader::do_index_search_root(uint32 *out_offset,
uint32 *out_gtid_count)
{
current_state.reset_nolock();
compare_state.reset_nolock();
/*
These states will be initialized to the full state stored at the start of
the root node and then incrementally updated.
*/
bool current_state_updated= false;
if (read_root_node())
return -1;
for (;;)
{
if (*n->first_page->flag_ptr & PAGE_FLAG_IS_LEAF)
break;
if (compare_state.load_nolock(¤t_state))
{
give_error("Out of memory allocating GTID list");
return -1;
}
uint32 child_ptr;
if (get_child_ptr(&child_ptr))
return -1;
/* Scan over the keys in the node to find the child pointer to follow */
for (;;)
{
uint32 offset, gtid_count;
int res= get_offset_count(&offset, >id_count);
if (res == 1) // EOF?
{
/* Follow the right-most child pointer. */
if (read_node(child_ptr))
return -1;
break;
}
rpl_gtid *gtid_list= gtid_list_buffer(gtid_count);
uint32 child2_ptr;
if ((gtid_count > 0 && !gtid_list) ||
get_gtid_list(gtid_list, gtid_count) ||
get_child_ptr(&child2_ptr))
return -1;
if (update_gtid_state(&compare_state, gtid_list, gtid_count))
return -1;
int cmp= (this->*search_cmp_function)(offset, &compare_state);
if (cmp < 0)
{
/* Follow the left child of this key. */
if (read_node(child_ptr))
return -1;
break;
}
/* Continue to scan the next key. */
update_gtid_state(¤t_state, gtid_list, gtid_count);
current_state_updated= true;
current_offset= offset;
child_ptr= child2_ptr;
}
}
return do_index_search_leaf(current_state_updated,
out_offset, out_gtid_count);
}
int Gtid_index_reader::do_index_search_leaf(bool current_state_updated,
uint32 *out_offset,
uint32 *out_gtid_count)
{
uint32 offset, gtid_count;
int res= get_offset_count(&offset, >id_count);
if (res == 1)
{
DBUG_ASSERT(0);
give_error("Corrupt index; empty leaf node");
return -1;
}
rpl_gtid *gtid_list= gtid_list_buffer(gtid_count);
if ((gtid_count > 0 && !gtid_list) ||
get_gtid_list(gtid_list, gtid_count))
return -1;
/*
The first key is ignored (already included in the current state), unless
it is the very first state in the index.
*/
if (!current_state_updated)
update_gtid_state(¤t_state, gtid_list, gtid_count);
current_offset= offset;
if (compare_state.load_nolock(¤t_state))
{
give_error("Out of memory allocating GTID state");
return -1;
}
int cmp= (this->*search_cmp_function)(offset, &compare_state);
if (cmp < 0)
return 0; // Search position is before start of index.
/* Scan over the keys in the leaf node. */
for (;;)
{
uint32 offset, gtid_count;
int res= get_offset_count(&offset, >id_count);
if (res == 1) // EOF?
{
/* Reached end of leaf, last key is the one searched for. */
break;
}
gtid_list= gtid_list_buffer(gtid_count);
if ((gtid_count > 0 && !gtid_list) ||
get_gtid_list(gtid_list, gtid_count))
return -1;
if (update_gtid_state(&compare_state, gtid_list, gtid_count))
return -1;
cmp= (this->*search_cmp_function)(offset, &compare_state);
if (cmp < 0)
{
/* Next key is larger, so current state is the one searched for. */
break;
}
update_gtid_state(¤t_state, gtid_list, gtid_count);
current_offset= offset;
}
*out_offset= current_offset;
*out_gtid_count= current_state.count_nolock();
/* Save the result in the shared gtid list buffer. */
if ((!(gtid_list= gtid_list_buffer(*out_gtid_count)) && *out_gtid_count > 0) ||
current_state.get_gtid_list_nolock(gtid_list, *out_gtid_count))
return -1;
return 1;
}
/*
Read the file header and check that it's valid and that the format is not
too new a version for us to be able to read it.
*/
int
Gtid_index_reader::read_file_header()
{
if (!file_open)
return 1;
uchar buf[GTID_INDEX_FILE_HEADER_SIZE + GTID_INDEX_PAGE_HEADER_SIZE];
if (MY_FILEPOS_ERROR == mysql_file_seek(index_file, 0, MY_SEEK_SET, MYF(0)) ||
mysql_file_read(index_file, buf,
GTID_INDEX_FILE_HEADER_SIZE + GTID_INDEX_PAGE_HEADER_SIZE,
MYF(MY_NABP)))
return give_error("Error reading page from index file");
if (memcmp(&buf[0], GTID_INDEX_MAGIC, sizeof(GTID_INDEX_MAGIC)))
return give_error("Corrupt index file, magic not found in header");
version_major= buf[4];
version_minor= buf[5];
/* We cannot safely read a major version we don't know about. */
if (version_major > GTID_INDEX_VERSION_MAJOR)
return give_error("Incompatible index file, version too high");
page_size= uint4korr(&buf[8]);
/* Verify checksum integrity of page_size and major/minor version. */
uint32 crc= my_checksum(0, buf, sizeof(buf));
uchar *buf3= (uchar *)
my_malloc(key_memory_binlog_gtid_index, page_size - sizeof(buf), MYF(0));
if (!buf3)
return give_error("Error allocating memory for index page");
int res= 0;
if (mysql_file_read(index_file, buf3, page_size - sizeof(buf), MYF(MY_NABP)))
res= give_error("Error reading page from index file");
else
{
crc= my_checksum(crc, buf3, page_size - sizeof(buf) - CHECKSUM_LEN);
if (crc != uint4korr(buf3 + page_size - sizeof(buf) - CHECKSUM_LEN))
res= give_error("Corrupt page, invalid checksum");
}
my_free(buf3);
if (res)
return res;
/*
Check that there is a valid root node at the end of the file.
If there is not, the index may be a "hot index" that is currently being
constructed. Or it was only partially written before server crash and not
recovered for some reason.
*/
uchar flags= buf[GTID_INDEX_PAGE_HEADER_SIZE];
constexpr uchar needed_flags= PAGE_FLAG_ROOT|PAGE_FLAG_LAST;
if ((flags & needed_flags) == needed_flags)
{
/* Special case: the index is a single page, which is the root node. */
has_root_node= true;
}
else
{
uchar buf2[GTID_INDEX_PAGE_HEADER_SIZE];
if (MY_FILEPOS_ERROR == mysql_file_seek(index_file, -(int32)page_size,
MY_SEEK_END, MYF(0)) ||
mysql_file_read(index_file, buf2, GTID_INDEX_PAGE_HEADER_SIZE,
MYF(MY_NABP)))
return give_error("Error reading root page from index file");
flags= buf2[0];
has_root_node= ((flags & needed_flags) == needed_flags);
/* No need to verify checksum here, will be done by read_root_node(). */
}
index_valid= true;
return 0;
}
int
Gtid_index_reader::verify_checksum(Gtid_index_base::Node_page *page)
{
uint32 calc_checksum= my_checksum(0, page->page, page_size - CHECKSUM_LEN);
uint32 read_checksum= uint4korr(page->page + page_size - CHECKSUM_LEN);
if (calc_checksum != read_checksum)
return give_error("Corrupt page, invalid checksum");
return 0;
}
Gtid_index_base::Node_page *
Gtid_index_reader::alloc_and_read_page()
{
Node_page *page= alloc_page();
if (!page)
{
give_error("Error allocating memory for index page");
return nullptr;
}
if (mysql_file_read(index_file, page->page, page_size, MYF(MY_NABP)))
{
my_free(page);
give_error("Error reading page from index file");
return nullptr;
}
if (verify_checksum(page))
{
my_free(page);
return nullptr;
}
return page;
}
int
Gtid_index_reader::read_root_node()
{
if (!index_valid || !has_root_node)
return 1;
cold_node.reset();
n= &cold_node;
/*
Read pages one by one from the back of the file until we have a complete
root node.
*/
if (MY_FILEPOS_ERROR == mysql_file_seek(index_file, -(int32)page_size,
MY_SEEK_END, MYF(0)))
return give_error("Error seeking index file");
for (;;)
{
Node_page *page= alloc_and_read_page();
if (!page)
return 1;
if (mysql_file_tell(index_file, MYF(0)) == page_size)
page->flag_ptr= &page->page[GTID_INDEX_FILE_HEADER_SIZE];
else
page->flag_ptr= &page->page[0];
page->next= n->first_page;
n->first_page= page;
uchar flags= *page->flag_ptr;
if (unlikely(!(flags & PAGE_FLAG_ROOT)))
return give_error("Corrupt or truncated index, no root node found");
if (!(flags & PAGE_FLAG_IS_CONT))
break; // Found start of root node
if (MY_FILEPOS_ERROR == mysql_file_seek(index_file, -(int32)(2*page_size),
MY_SEEK_CUR, MYF(0)))
return give_error("Error seeking index file for multi-page root node");
}
read_page= n->first_page;
read_ptr= read_page->flag_ptr + GTID_INDEX_PAGE_HEADER_SIZE;
return 0;
}
int
Gtid_index_reader::read_node(uint32 page_ptr)
{
DBUG_ASSERT(page_ptr != 0 /* No zero child pointers in on-disk pages. */);
if (!index_valid || !page_ptr)
return 1;
return read_node_cold(page_ptr);
}
int
Gtid_index_reader::read_node_cold(uint32 page_ptr)
{
if (MY_FILEPOS_ERROR == mysql_file_seek(index_file, (page_ptr-1)*page_size,
MY_SEEK_SET, MYF(0)))
return give_error("Error seeking index file");
bool file_header= (page_ptr == 1);
cold_node.reset();
n= &cold_node;
Node_page **next_ptr_ptr= &n->first_page;
for (;;)
{
Node_page *page= alloc_and_read_page();
if (!page)
return 1;
page->flag_ptr= &page->page[file_header ? GTID_INDEX_FILE_HEADER_SIZE : 0];
file_header= false;
/* Insert the page at the end of the list. */
page->next= nullptr;
*next_ptr_ptr= page;
next_ptr_ptr= &page->next;
uchar flags= *(page->flag_ptr);
if (flags & PAGE_FLAG_LAST)
break;
}
read_page= n->first_page;
read_ptr= read_page->flag_ptr + GTID_INDEX_PAGE_HEADER_SIZE;
return 0;
}
int Gtid_index_reader::give_error(const char *msg)
{
sql_print_information("Error reading binlog GTID index, will "
"fallback to slower sequential binlog scan. "
"Error is: %s", msg);
return 1;
}
Gtid_index_reader_hot::Gtid_index_reader_hot()
: hot_writer(nullptr)
{
}
int
Gtid_index_reader_hot::get_child_ptr(uint32 *out_child_ptr)
{
if (find_bytes(4))
{
/*
If reading hot index, EOF or zero child ptr means the child pointer has
not yet been written. A zero out_child_ptr makes read_node() read the
hot node for the child.
*/
if (hot_writer)
{
*out_child_ptr= 0;
return 0;
}
return give_error("Corrupt index, short index node");
}
*out_child_ptr= (uint32)uint4korr(read_ptr);
read_ptr+= 4;
return 0;
}
int
Gtid_index_reader_hot::do_index_search(uint32 *out_offset,
uint32 *out_gtid_count)
{
/* Check for a "hot" index. */
Gtid_index_writer::lock_gtid_index();
hot_writer= Gtid_index_writer::find_hot_index(index_file_name);
if (!hot_writer)
{
Gtid_index_writer::unlock_gtid_index();
/*
Check the index file header (and index end) again, in case it was
hot when open_index_file() was called, but became cold in the meantime.
*/
if (!has_root_node && Gtid_index_reader::read_file_header())
return -1;
}
int res= do_index_search_root(out_offset, out_gtid_count);
if (hot_writer)
{
hot_writer= nullptr;
Gtid_index_writer::unlock_gtid_index();
}
return res;
}
int
Gtid_index_reader_hot::read_file_header()
{
if (!file_open)
return 1;
Gtid_index_writer::lock_gtid_index();
hot_writer= Gtid_index_writer::find_hot_index(index_file_name);
if (!hot_writer)
Gtid_index_writer::unlock_gtid_index();
int res;
if (hot_writer && hot_writer->max_level == 0)
{
/*
No pages from the hot index have been written to disk, there's just a
single incomplete node at level 0.
We have to read the file header from the in-memory page.
*/
uchar *p= hot_writer->nodes[0]->first_page->page;
page_size= uint4korr(p + 8);
has_root_node= false;
index_valid= true;
res= 0;
}
else
res= Gtid_index_reader::read_file_header();
if (hot_writer)
{
hot_writer= nullptr;
Gtid_index_writer::unlock_gtid_index();
}
return res;
}
int
Gtid_index_reader_hot::read_root_node()
{
if (!index_valid)
return 1;
if (hot_writer)
{
hot_level= hot_writer->max_level;
return read_node_hot();
}
if (has_root_node)
{
return Gtid_index_reader::read_root_node();
}
return 1;
}
int
Gtid_index_reader_hot::read_node(uint32 page_ptr)
{
if (!index_valid || (!page_ptr && !hot_writer))
return 1;
if (hot_writer)
{
if (!page_ptr)
{
/*
The "hot" index is only partially written. Not yet written child pages
are indicated by zero child pointers. Such child pages are found from
the list of active nodes in the writer.
*/
if (hot_level <= 0)
{
DBUG_ASSERT(0 /* Should be no child pointer to follow on leaf page. */);
return give_error("Corrupt hot index (child pointer on leaf page");
}
DBUG_ASSERT(n == hot_writer->nodes[hot_level]);
--hot_level;
return read_node_hot();
}
/*
We started searching the "hot" index, but now we've reached a "cold"
part of the index that's already fully written. So leave the "hot index"
mode and continue reading pages from the on-disk index from here.
*/
hot_writer= nullptr;
Gtid_index_writer::unlock_gtid_index();
}
return read_node_cold(page_ptr);
}
int
Gtid_index_reader_hot::read_node_hot()
{
if (hot_writer->error_state)
return give_error("Cannot access hot index");
n= hot_writer->nodes[hot_level];
read_page= n->first_page;
/* The writer should allocate pages for all nodes. */
DBUG_ASSERT(read_page != nullptr);
if (!read_page)
return give_error("Page not available in hot index");
read_ptr= read_page->flag_ptr + GTID_INDEX_PAGE_HEADER_SIZE;
return 0;
}
|