1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162
|
/****************************************************************
* *
* Copyright (c) 2006-2024 Fidelity National Information *
* Services, Inc. and/or its subsidiaries. All rights reserved. *
* *
* This source code contains the intellectual property *
* of its copyright holder(s), and is made available *
* under a license. If you do not know the terms of *
* the license, please stop and do not read further. *
* *
****************************************************************/
#if defined(__MVS__) && !defined(_ISOC99_SOURCE)
#define _ISOC99_SOURCE
#endif
#include "mdef.h"
#include "gtm_stdio.h" /* for FILE * in repl_comm.h */
#include "gtm_socket.h"
#include "gtm_netdb.h"
#include "gtm_inet.h"
#include <sys/time.h>
#include <errno.h>
#include "gtm_fcntl.h"
#include "gtm_unistd.h"
#include "gtm_stat.h"
#include "gtm_string.h"
#include "gdsroot.h"
#include "gdsblk.h"
#include "gtm_facility.h"
#include "fileinfo.h"
#include "gdsbt.h"
#include "gdsfhead.h"
#include "filestruct.h"
#include "repl_msg.h"
#include "gtmsource.h"
#include "repl_comm.h"
#include "repl_shutdcode.h"
#include "jnl.h"
#include "hashtab_mname.h" /* needed for muprec.h */
#include "hashtab_int4.h" /* needed for muprec.h */
#include "hashtab_int8.h" /* needed for muprec.h */
#include "buddy_list.h"
#include "muprec.h"
#include "repl_ctl.h"
#include "repl_errno.h"
#include "gtmio.h" /* for REPL_DPRINT* macros */
#include "repl_dbg.h"
#include "iosp.h"
#include "gt_timer.h"
#include "eintr_wrappers.h"
#include "repl_sp.h"
#include "repl_filter.h"
#include "repl_log.h"
#include "sgtm_putmsg.h"
#include "min_max.h"
#include "error.h"
#include "repl_instance.h"
#include "ftok_sems.h"
#include "gtmmsg.h"
#include "wbox_test_init.h"
#include "have_crit.h" /* needed for ZLIB_COMPRESS */
#include "deferred_signal_handler.h" /* needed for ZLIB_COMPRESS */
#include "gtm_zlib.h"
#include "replgbl.h"
#include "repl_inst_dump.h" /* for "repl_dump_histinfo" prototype */
#include "gtmdbgflags.h"
#include "lockconst.h"
#ifdef GTM_TLS
#include "gtm_repl.h"
#endif
#define SEND_REPL_LOGFILE_INFO(LOGFILE, LOGFILE_MSG) \
{ \
int len; \
\
len = repl_logfileinfo_get(LOGFILE, &LOGFILE_MSG, FALSE, gtmsource_log_fp); \
REPL_SEND_LOOP(gtmsource_sock_fd, &LOGFILE_MSG, len, REPL_POLL_NOWAIT) \
{ \
gtmsource_poll_actions(FALSE); \
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) \
|| (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)) \
return SS_NORMAL; \
} \
}
#define PROC_OPS_PRINT_MSG_LEN 1024
char print_msg_src[PROC_OPS_PRINT_MSG_LEN];
char print_msg_t[PROC_OPS_PRINT_MSG_LEN];
GBLREF boolean_t gtmsource_logstats;
GBLREF boolean_t gtmsource_pool2file_transition;
GBLREF boolean_t gtmsource_received_cmp2uncmp_msg;
GBLREF boolean_t secondary_side_std_null_coll;
GBLREF FILE *gtmsource_log_fp;
GBLREF gd_addr *gd_header;
GBLREF gtmsource_state_t gtmsource_state;
GBLREF int4 strm_index;
GBLREF int gtmsource_cmpmsgbufsiz;
GBLREF int gtmsource_filter;
GBLREF int gtmsource_log_fd;
GBLREF int gtmsource_msgbufsiz;
GBLREF int gtmsource_sock_fd;
GBLREF int repl_filter_bufsiz;
GBLREF int repl_max_send_buffsize, repl_max_recv_buffsize;
GBLREF jnlpool_addrs_ptr_t jnlpool;
GBLREF repl_conn_info_t *this_side, *remote_side;
GBLREF repl_ctl_element *repl_ctl_list;
GBLREF repl_msg_ptr_t gtmsource_cmpmsgp;
GBLREF repl_msg_ptr_t gtmsource_msgp;
GBLREF seq_num gtmsource_save_read_jnl_seqno;
GBLREF seq_num seq_num_zero;
GBLREF uchar_ptr_t repl_filter_buff;
GBLREF uint4 process_id;
GBLREF unsigned char *gtmsource_tcombuffp;
GBLREF unsigned char *gtmsource_tcombuff_start;
GBLREF gtmsource_options_t gtmsource_options;
error_def(ERR_REPLALERT);
error_def(ERR_REPL2OLD);
error_def(ERR_REPLCOMM);
error_def(ERR_REPLFTOKSEM);
error_def(ERR_REPLINSTNOHIST);
error_def(ERR_REPLINSTSECMTCH);
error_def(ERR_REPLNOXENDIAN);
error_def(ERR_SECNOTSUPPLEMENTARY);
error_def(ERR_STRMNUMMISMTCH1);
error_def(ERR_STRMNUMMISMTCH2);
error_def(ERR_TEXT);
error_def(ERR_TLSCONVSOCK);
error_def(ERR_TLSHANDSHAKE);
int gtmsource_est_conn()
{
char print_msg[PROC_OPS_PRINT_MSG_LEN], msg_str[1024], *errmsg;
int connection_attempts, max_heartbeat_wait, save_errno, comminit_retval, status;
int logging_period, logging_interval; /* logging period = soft_tries_period*logging_interval */
int alert_period, alert_period_ms, hardtries_count, hardtries_period;
int max_shutdown_wait, max_sleep, soft_tries_period, soft_tries_period_ms;
int secondary_addrlen;
time_t alert_time_start = 0, noconnection_time = 0; /* For logging the REPLALERT message */
struct timeval cur_time;
int cur_time_ms, diff_time_ms;
boolean_t throw_errors = TRUE;
sockaddr_ptr secondary_sa;
gtmsource_local_ptr_t gtmsource_local;
gtmsource_local = jnlpool->gtmsource_local;
# ifdef GTM_TLS
assert(!repl_tls.enabled); /* Set after REPL_NEED_TLS_INFO/REPL_TLS_INFO messages are exchanged. */
assert(REPLTLS_RENEG_STATE_NONE == repl_tls.renegotiate_state);
/* We either did not create a TLS/SSL aware socket or the SSL object off of the TLS/SSL aware socket should be NULL since
* we haven't yet connected to the receiver server. Assert that.
*/
assert((NULL == repl_tls.sock) || (NULL == repl_tls.sock->ssl));
/* Since this is a new connection, reset next renegotiation time. */
gtmsource_local->next_renegotiate_time = 0; /* Set to correct value after TLS/SSL handshake is established. */
# endif
assert(remote_side == >msource_local->remote_side);
remote_side->proto_ver = REPL_PROTO_VER_UNINITIALIZED;
remote_side->endianness_known = FALSE;
/* Connect to the secondary - use hard tries, soft tries ... */
connection_attempts = 0;
comminit_retval = gtmsource_comm_init(throw_errors); /* set up gtmsource_local.secondary_ai */
max_shutdown_wait = GTMSOURCE_MAX_SHUTDOWN_WAITLOOP(gd_header);
soft_tries_period = gtmsource_local->connect_parms[GTMSOURCE_CONN_SOFT_TRIES_PERIOD];
hardtries_period = gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_PERIOD];
hardtries_count = gtmsource_local->connect_parms[GTMSOURCE_CONN_HARD_TRIES_COUNT];
alert_period = gtmsource_local->connect_parms[GTMSOURCE_CONN_ALERT_PERIOD];
max_heartbeat_wait = gtmsource_local->connect_parms[GTMSOURCE_CONN_HEARTBEAT_MAX_WAIT];
max_sleep = DIVIDE_ROUND_UP(max_shutdown_wait, 2);
if (hardtries_period > (max_sleep * 1000))
{
hardtries_period = max_sleep * 1000;
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Hard tries period cannot be more than half of maximum shutdown wait time of %d seconds. "
"Reducing hard tries period to %d milliseconds \n", max_shutdown_wait,
hardtries_period);
}
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connect hard tries count = %d, Connect hard tries period = %d milliseconds\n",
hardtries_count, hardtries_period);
while (++connection_attempts < (hardtries_count + 1))
{
if (0 == noconnection_time)
time(&noconnection_time);
if ((FD_INVALID == gtmsource_sock_fd) || (0 != comminit_retval))
{ /* gtmsource_comm_init failed to initialize the socket. Report error and retry gtmsource_comm_init. */
repl_log(gtmsource_log_fp, TRUE, TRUE,
"%d hard connection attempt failed : Error with source server connection initiation\n",
connection_attempts);
} else
{
secondary_sa = (sockaddr_ptr)(>msource_local->secondary_inet_addr);
secondary_addrlen = gtmsource_local->secondary_addrlen;
CONNECT_SOCKET(gtmsource_sock_fd, secondary_sa, secondary_addrlen, status);
if (0 == status)
{
noconnection_time = 0;
break;
}
repl_log(gtmsource_log_fp, TRUE, TRUE, "%d hard connection attempt failed : %s\n", connection_attempts,
STRERROR(errno));
repl_close(>msource_sock_fd);
}
if (MILLISECS_IN_SEC > hardtries_period)
{
SHORT_SLEEP(hardtries_period);
}
else
LONG_SLEEP_MSEC(hardtries_period);
gtmsource_poll_actions(FALSE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
return (SS_NORMAL);
/* Check for network resolution issues if the socket is invalid */
if (FD_INVALID == gtmsource_sock_fd)
comminit_retval = gtmsource_comm_init(throw_errors);
} /* end of while*/
gtmsource_poll_actions(FALSE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
return (SS_NORMAL);
if (hardtries_count <= connection_attempts)
{ /*Initialize variables to gradually reduce soft_try_attempts period logging*/
logging_period = soft_tries_period;
logging_interval = 1;
if (soft_tries_period > max_sleep)
{
soft_tries_period = max_sleep;
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Soft tries period cannot be more than half of the maximum shutdown wait time of %d seconds. "
"Reducing soft tries period to %d seconds \n",
max_shutdown_wait, soft_tries_period);
}
repl_log(gtmsource_log_fp, TRUE, TRUE, "Soft tries period = %d seconds, REPLALERT message period = %d seconds\n",
soft_tries_period, alert_period);
connection_attempts = 0;
do
{
if ((FD_INVALID == gtmsource_sock_fd) || (0 != comminit_retval))
{ /* gtmsource_comm_init failed to initialize the socket. Report error and retry gtmsource_comm_init. */
if (0 == alert_time_start)
time(&alert_time_start);
if (0 == noconnection_time)
noconnection_time = alert_time_start;
if (0 == (connection_attempts + 1) % logging_interval)
{
repl_log(gtmsource_log_fp, TRUE, TRUE,
"%d soft connection attempt failed : Error with source server connection initiation\n",
connection_attempts + 1);
throw_errors = TRUE;
} else /* Decrease the frequency of showing the connection failure error messages */
throw_errors = FALSE;
} else
{
secondary_sa = (sockaddr_ptr)(>msource_local->secondary_inet_addr);
secondary_addrlen = gtmsource_local->secondary_addrlen;
CONNECT_SOCKET(gtmsource_sock_fd, secondary_sa, secondary_addrlen, status);
if (0 == status)
{
noconnection_time = 0;
break;
}
if (0 == alert_time_start)
time(&alert_time_start);
if (0 == noconnection_time)
noconnection_time = alert_time_start;
save_errno = errno;
repl_close(>msource_sock_fd);
if (0 == (connection_attempts + 1) % logging_interval)
{
repl_log(gtmsource_log_fp, TRUE, TRUE, "%d soft connection attempt failed : %s\n",
connection_attempts + 1, STRERROR(save_errno));
throw_errors = TRUE;
} else /* Decrease the frequency of showing the connection failure error messages */
throw_errors = FALSE;
}
/* Sometimes we undersleep in AIX. We adopt the solution from op_hang.c to add an additional 2ms and make
* sure we get into the next second */
soft_tries_period_ms = soft_tries_period * 1000;
#ifdef AIX
soft_tries_period_ms += 2;
#endif
LONG_SLEEP_MSEC(soft_tries_period_ms);
gtmsource_poll_actions(FALSE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
return (SS_NORMAL);
/* Check for network resolution issues if the socket is invalid */
if (FD_INVALID == gtmsource_sock_fd)
comminit_retval = gtmsource_comm_init(throw_errors);
connection_attempts++;
alert_period_ms = alert_period * 1000;
gettimeofday(&cur_time, NULL);
cur_time_ms = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
diff_time_ms = cur_time_ms - (alert_time_start * 1000);
if ((0 != alert_period) && (alert_period_ms - ((5 <= soft_tries_period) ? 500 : 0) <= diff_time_ms))
{ /* Log the REPLALERT message if we close to, at or after our fire time */
sgtm_putmsg(print_msg, PROC_OPS_PRINT_MSG_LEN, VARLSTCNT(4) ERR_REPLALERT, 2,
jnlpool->gtmsource_local->secondary_instname,
(UINTPTR_T)difftime(time(NULL), noconnection_time));
repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg);
alert_time_start = 0;
}
if (logging_period <= REPL_MAX_LOG_PERIOD)
{ /*the maximum real_period can reach 2*REPL_MAX_LOG_PERIOD)*/
if (0 == connection_attempts % logging_interval)
{ /* Double the logging period after every logging attempt*/
logging_interval = logging_interval << 1;
logging_period = logging_period << 1;
}
}
} while (TRUE);
}
if (0 != (status = get_send_sock_buff_size(gtmsource_sock_fd, &repl_max_send_buffsize)))
{
SNPRINTF(msg_str, SIZEOF(msg_str), "Error getting socket send buffsize : %s", STRERROR(status));
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, LEN_AND_STR(msg_str));
}
if (0 != (status = get_recv_sock_buff_size(gtmsource_sock_fd, &repl_max_recv_buffsize)))
{
SNPRINTF(msg_str, SIZEOF(msg_str), "Error getting socket recv buffsize : %s", STRERROR(status));
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, LEN_AND_STR(msg_str));
}
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connected to secondary, using TCP send buffer size %d receive buffer size %d\n",
repl_max_send_buffsize, repl_max_recv_buffsize);
repl_log_conn_info(gtmsource_sock_fd, gtmsource_log_fp, FALSE);
/* re-determine compression level on the replication pipe after every connection establishment */
gtmsource_local->repl_zlib_cmp_level = repl_zlib_cmp_level = ZLIB_CMPLVL_NONE;
/* reset any CMP2UNCMP messages received in prior connections. Once a connection encounters a REPL_CMP2UNCMP message
* all further replication on that connection will be uncompressed.
*/
gtmsource_received_cmp2uncmp_msg = FALSE;
return (SS_NORMAL);
}
int gtmsource_alloc_tcombuff(void)
{ /* Allocate buffer for TCOM, ZTCOM records */
if (NULL == gtmsource_tcombuff_start)
{
assert(NULL == gtmsource_tcombuff_start);
gtmsource_tcombuff_start = (unsigned char *)malloc(gd_header->n_regions * TCOM_RECLEN);
}
return (SS_NORMAL);
}
void gtmsource_free_tcombuff(void)
{
if (NULL != gtmsource_tcombuff_start)
{
free(gtmsource_tcombuff_start);
gtmsource_tcombuff_start = NULL;
}
return;
}
int gtmsource_alloc_filter_buff(int bufsiz)
{
unsigned char *old_filter_buff;
if ((NO_FILTER != gtmsource_filter) && (bufsiz > repl_filter_bufsiz))
{
REPL_DPRINT3("Expanding filter buff from %d to %d\n", repl_filter_bufsiz, bufsiz);
old_filter_buff = repl_filter_buff;
repl_filter_buff = (unsigned char *)malloc(bufsiz);
if (NULL != old_filter_buff)
{
assert(NULL != old_filter_buff);
memcpy(repl_filter_buff, old_filter_buff, repl_filter_bufsiz);
free(old_filter_buff);
}
repl_filter_bufsiz = bufsiz;
}
return (SS_NORMAL);
}
void gtmsource_free_filter_buff(void)
{
if (NULL != repl_filter_buff)
{
assert(NULL != repl_filter_buff);
free(repl_filter_buff);
repl_filter_buff = NULL;
repl_filter_bufsiz = 0;
}
}
int gtmsource_alloc_msgbuff(int maxbuffsize, boolean_t discard_oldbuff)
{ /* Allocate message buffer */
repl_msg_ptr_t oldmsgp;
assert(MIN_REPL_MSGLEN < maxbuffsize);
if ((maxbuffsize > gtmsource_msgbufsiz) || (NULL == gtmsource_msgp))
{
REPL_DPRINT3("Expanding message buff from %d to %d\n", gtmsource_msgbufsiz, maxbuffsize);
oldmsgp = gtmsource_msgp;
gtmsource_msgp = (repl_msg_ptr_t)malloc(maxbuffsize);
if (NULL != oldmsgp)
{ /* Copy existing data */
if (!discard_oldbuff)
memcpy((unsigned char *)gtmsource_msgp, (unsigned char *)oldmsgp, gtmsource_msgbufsiz);
free(oldmsgp);
}
gtmsource_msgbufsiz = maxbuffsize;
if (ZLIB_CMPLVL_NONE != gtm_zlib_cmp_level)
{ /* Compression is enabled. Allocate parallel buffers to hold compressed journal records.
* Allocate extra space just in case compression actually expands the data (needed only in rare cases).
*/
oldmsgp = gtmsource_cmpmsgp;
gtmsource_cmpmsgp = (repl_msg_ptr_t)malloc(maxbuffsize * MAX_CMP_EXPAND_FACTOR);
if (NULL != oldmsgp)
free(oldmsgp);
gtmsource_cmpmsgbufsiz = (maxbuffsize * MAX_CMP_EXPAND_FACTOR);
}
gtmsource_alloc_filter_buff(gtmsource_msgbufsiz);
}
return (SS_NORMAL);
}
void gtmsource_free_msgbuff(void)
{
if (NULL != gtmsource_msgp)
{
free(gtmsource_msgp);
gtmsource_msgp = NULL;
gtmsource_msgbufsiz = 0;
if (ZLIB_CMPLVL_NONE != gtm_zlib_cmp_level)
{ /* Compression is enabled. Free up compression buffer as well. */
assert(NULL != gtmsource_cmpmsgp);
free(gtmsource_cmpmsgp);
gtmsource_cmpmsgp = NULL;
gtmsource_cmpmsgbufsiz = 0;
}
}
}
/* Receive starting jnl_seqno for (re)starting transmission */
int gtmsource_recv_restart(seq_num *recvd_jnl_seqno, int *msg_type, int *start_flags)
{
boolean_t msg_is_cross_endian, log_waitmsg;
fd_set input_fds;
repl_msg_t msg;
repl_logfile_info_msg_t logfile_msg;
unsigned char *msg_ptr; /* needed for REPL_{SEND,RECV}_LOOP */
int tosend_len, sent_len, sent_this_iter; /* needed for REPL_SEND_LOOP */
int torecv_len, recvd_len, recvd_this_iter; /* needed for REPL_RECV_LOOP */
int status, poll_dir; /* needed for REPL_{SEND,RECV}_LOOP */
uint4 remaining_len, len;
unsigned char seq_num_str[32], *seq_num_ptr, *buffp;
repl_msg_t xoff_ack;
# ifdef DEBUG
boolean_t remote_side_endianness_known;
# endif
DCL_THREADGBL_ACCESS;
SETUP_THREADGBL_ACCESS;
status = SS_NORMAL;
assert(remote_side == &jnlpool->gtmsource_local->remote_side);
DEBUG_ONLY(*msg_type = -1);
for (log_waitmsg = TRUE; SS_NORMAL == status; )
{
if (log_waitmsg)
repl_log(gtmsource_log_fp, TRUE, TRUE, "Waiting for REPL_START_JNL_SEQNO or REPL_FETCH_RESYNC message\n");
REPL_RECV_LOOP(gtmsource_sock_fd, &msg, MIN_REPL_MSGLEN, REPL_POLL_WAIT)
{
gtmsource_poll_actions(FALSE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
return (SS_NORMAL);
}
DEBUG_ONLY(remote_side_endianness_known = remote_side->endianness_known);
if (SS_NORMAL != status)
break;
if (REPL_LOGFILE_INFO == msg.type) /* No need to endian convert as the receiver converts this to our native fmt */
{ /* We received a REPL_START_JNL_SEQNO/REPL_FETCH_RESYNC and coming through the loop again
* to receive REPL_LOGFILE_INFO. At this point, we should have already established the endianness
* of the remote side and even if the remote side is of different endianness, we are going to interpret the
* message without endian conversion because the Receiver Server, from REPL_PROTO_VER_SUPPLEMENTARY
* onwards, always endian converts the message intended for the Source Server
*/
assert(remote_side->endianness_known);
assert(REPL_PROTO_VER_REMOTE_LOGPATH <= remote_side->proto_ver);
assert(-1 != *msg_type);
buffp = (unsigned char *)&logfile_msg;
/* First copy what we already received */
memcpy(buffp, &msg, MIN_REPL_MSGLEN);
assert((logfile_msg.fullpath_len > 0) && (logfile_msg.fullpath_len < REPL_LOGFILE_PATH_MAX));
/* Now receive the rest of the message */
buffp += MIN_REPL_MSGLEN;
remaining_len = msg.len - MIN_REPL_MSGLEN;
REPL_RECV_LOOP(gtmsource_sock_fd, buffp, remaining_len, REPL_POLL_WAIT)
{
gtmsource_poll_actions(FALSE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
return SS_NORMAL;
}
if (SS_NORMAL != status)
return status;
assert(REPL_PROTO_VER_REMOTE_LOGPATH <= logfile_msg.proto_ver);
assert(logfile_msg.proto_ver == remote_side->proto_ver);
assert('\0' == logfile_msg.fullpath[logfile_msg.fullpath_len - 1]);
if (REPL_FETCH_RESYNC == *msg_type)
{
repl_log(gtmsource_log_fp, TRUE, TRUE, "Remote side rollback path is %s; Rollback PID = %d\n",
logfile_msg.fullpath, logfile_msg.pid);
}
else
{
assert(REPL_START_JNL_SEQNO == *msg_type);
repl_log(gtmsource_log_fp, TRUE, TRUE, "Remote side receiver log file path is %s; "
"Receiver Server PID = %d\n", logfile_msg.fullpath, logfile_msg.pid);
}
/* Now that we've received REPL_LOGFILE_INFO message from the other side, handshake is complete. */
return SS_NORMAL;
}
/* If endianness of other side is not yet known, determine that now by seeing if the msg.len is
* greater than expected. If it is, convert it and see if it is now what we expect. If it is,
* then the other system is of opposite endianness. Note: We would normally use msg.type since
* it is effectively an enum and we control by adding new messages. But, REPL_START_JNL_SEQNO
* is lucky number zero which means it is identical on systems of either endianness.
*
* If endianness of other side is not yet known, determine that from the message length as we
* expect it to be MIN_REPL_MSGLEN. There is one exception though. If a pre-V55000 receiver sends
* a REPL_XOFF_ACK_ME message, it could send it in the receiver's native-endian or cross-endian
* format (depending on how its global variable "src_node_same_endianness" is initialized). This
* bug in the receiver server logic is fixed V55000 onwards (proto_ver is REPL_PROTO_VER_SUPPLEMENTARY).
* Therefore, in this case, we cannot use the endianness of the REPL_XOFF_ACK_ME message to determine
* the endianness of the connection. In this case, wait for the next non-REPL_XOFF_ACK_ME message
* to determine the connection endianness. Handle this exception case correctly.
*
* If on the other hand, we know the endianness of the other side, we still cannot guarantee which
* endianness a REPL_XOFF_ACK_ME message is sent in (in pre-V55000 versions for example in V53004A where
* it is sent in receiver native endian format whereas in V54002B it is sent in source native
* endian format). So better be safe on the source side and handle those cases like we did when
* we did not know the endianness of the remote side.
*
* The below check works as all messages we expect at this point have a fixed length of MIN_REPL_MSGLEN.
*/
msg_is_cross_endian = (((unsigned)MIN_REPL_MSGLEN < (unsigned)msg.len)
&& ((unsigned)MIN_REPL_MSGLEN == GTM_BYTESWAP_32((unsigned)msg.len)));
if (msg_is_cross_endian)
{
msg.type = GTM_BYTESWAP_32(msg.type);
msg.len = GTM_BYTESWAP_32(msg.len);
}
assert(msg.type == REPL_START_JNL_SEQNO || msg.type == REPL_FETCH_RESYNC || msg.type == REPL_XOFF_ACK_ME);
assert(MIN_REPL_MSGLEN == msg.len);
/* If we don't yet know the endianness of the other side and the input message is not a REPL_XOFF_ACK_ME
* we can decide the endianness of the receiver side by the endianness of the input message.
* REPL_XOFF_ACK_ME is an exception due to its handling by pre-V5500 versions (described in comments above).
*/
if (!remote_side->endianness_known && (REPL_XOFF_ACK_ME != msg.type))
{
remote_side->endianness_known = TRUE;
remote_side->cross_endian = msg_is_cross_endian;
if (remote_side->cross_endian)
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source and Receiver sides have opposite "
"endianness\n");
else
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source and Receiver sides have same endianness\n");
}
/* We only expect REPL_START_JNL_SEQNO, REPL_LOGFILE_INFO and REPL_XOFF_ACK_ME messages to be sent once the
* endianness of the remote side has been determined. We don't expect the REPL_FETCH_RESYNC message to be
* ever sent in the middle of a handshake (i.e. after the remote side endianness has been determined).
* Assert that. The logic that sets "msg_is_cross_endian" relies on this. If this assert fails, the logic
* has to change.
*/
assert((REPL_FETCH_RESYNC != msg.type) || !remote_side_endianness_known);
*msg_type = msg.type;
memcpy((uchar_ptr_t)recvd_jnl_seqno, (uchar_ptr_t)&msg.msg[0], SIZEOF(seq_num));
if (REPL_START_JNL_SEQNO == msg.type)
{
int lcl_start_flags;
if (msg_is_cross_endian)
*recvd_jnl_seqno = GTM_BYTESWAP_64(*recvd_jnl_seqno);
repl_log(gtmsource_log_fp, TRUE, TRUE, "Received REPL_START_JNL_SEQNO message with SEQNO "
"%llu [0x%llx]\n", INT8_PRINT(*recvd_jnl_seqno), INT8_PRINT(*recvd_jnl_seqno));
lcl_start_flags = msg_is_cross_endian ? GTM_BYTESWAP_32(((repl_start_msg_ptr_t)&msg)->start_flags)
: ((repl_start_msg_ptr_t)&msg)->start_flags;
assert(!msg_is_cross_endian || (NODE_ENDIANNESS != ((repl_start_msg_ptr_t)&msg)->node_endianness));
if (lcl_start_flags & START_FLAG_STOPSRCFILTER)
{
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Start JNL_SEQNO msg tagged with STOP SOURCE FILTER\n");
if (gtmsource_filter & EXTERNAL_FILTER)
{
repl_stop_filter();
gtmsource_filter &= ~EXTERNAL_FILTER;
} else
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Filter is not active, ignoring STOP SOURCE FILTER msg\n");
}
/* Determine the protocol version of the receiver side. That information is encoded in the
* "proto_ver" field of the message from V51 onwards but to differentiate V50 vs V51 we need
* to check if the START_FLAG_VERSION_INFO bit is set in start_flags. If not the protocol is V50.
* V51 implies support for multi-site while V50 implies dual-site configuration only.
*/
if (lcl_start_flags & START_FLAG_VERSION_INFO)
{
assert(REPL_PROTO_VER_DUALSITE != ((repl_start_msg_ptr_t)&msg)->proto_ver);
remote_side->is_supplementary = ((repl_start_msg_ptr_t)&msg)->is_supplementary;
remote_side->proto_ver = ((repl_start_msg_ptr_t)&msg)->proto_ver;
} else
{ /* Issue REPL2OLD error because receiver is dual-site */
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPL2OLD, 4, LEN_AND_STR(UNKNOWN_INSTNAME),
LEN_AND_STR(jnlpool->repl_inst_filehdr->inst_info.this_instname));
}
assert(lcl_start_flags & START_FLAG_HASINFO); /* V4.2+ versions have jnl ver in the start msg */
remote_side->jnl_ver = ((repl_start_msg_ptr_t)&msg)->jnl_ver;
REPL_DPRINT3("Local jnl ver is octal %o, remote jnl ver is octal %o\n",
this_side->jnl_ver, remote_side->jnl_ver);
repl_check_jnlver_compat(!remote_side->cross_endian);
assert(remote_side->jnl_ver > V15_JNL_VER || 0 == (lcl_start_flags & START_FLAG_COLL_M));
if (remote_side->jnl_ver <= V15_JNL_VER)
lcl_start_flags &= ~START_FLAG_COLL_M; /* zap it for pro, just in case */
remote_side->is_std_null_coll = (lcl_start_flags & START_FLAG_COLL_M) ? TRUE : FALSE;
assert((remote_side->jnl_ver >= V19_JNL_VER) || (0 == (lcl_start_flags & START_FLAG_TRIGGER_SUPPORT)));
if (remote_side->jnl_ver < V19_JNL_VER)
lcl_start_flags &= ~START_FLAG_TRIGGER_SUPPORT; /* zap it for pro, just in case */
remote_side->trigger_supported = (lcl_start_flags & START_FLAG_TRIGGER_SUPPORT) ? TRUE : FALSE;
# ifdef GTM_TRIGGER
if (!remote_side->trigger_supported)
repl_log(gtmsource_log_fp, TRUE, TRUE, "Warning : Secondary does not support GT.M "
"database triggers. #t updates on primary will not be replicated\n");
# endif
/* remote_side->null_subs_xform is initialized later in function "gtmsource_process" */
(TREF(replgbl)).trig_replic_warning_issued = FALSE;
remote_side->tls_requested = (lcl_start_flags & START_FLAG_ENABLE_TLS) ? TRUE : FALSE;
*start_flags = lcl_start_flags;
if (REPL_PROTO_VER_REMOTE_LOGPATH > remote_side->proto_ver)
return SS_NORMAL; /* Remote side doesn't support REPL_LOGFILE_INFO message */
SEND_REPL_LOGFILE_INFO(jnlpool->gtmsource_local->log_file, logfile_msg);
log_waitmsg = FALSE;
} else if (REPL_FETCH_RESYNC == msg.type)
{ /* Determine the protocol version of the receiver side.
* Take care to set the flush parameter in repl_log calls below to FALSE until at least the
* first message gets sent back. This is so the fetchresync rollback on the other side does
* not timeout before receiving a response. */
remote_side->proto_ver = (RECAST(repl_resync_msg_ptr_t)&msg)->proto_ver;
remote_side->is_supplementary = (RECAST(repl_resync_msg_ptr_t)&msg)->is_supplementary;
/* The following fields don't need to be initialized since they are needed (for internal filter
* transformations) only if we send journal records across. REPL_FETCH_RESYNC causes only
* protocol messages to be exchanged (no journal records).
* remote_side->jnl_ver = ...
* remote_side->is_std_null_coll = ...
* remote_side->trigger_supported = ...
* remote_side->null_subs_xform = ...
*/
if (msg_is_cross_endian)
*recvd_jnl_seqno = GTM_BYTESWAP_64(*recvd_jnl_seqno);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received REPL_FETCH_RESYNC message with SEQNO "
"%llu [0x%llx]\n", INT8_PRINT(*recvd_jnl_seqno), INT8_PRINT(*recvd_jnl_seqno));
if (REPL_PROTO_VER_REMOTE_LOGPATH > remote_side->proto_ver)
return SS_NORMAL; /* Remote side doesn't support REPL_LOGFILE_INFO message */
SEND_REPL_LOGFILE_INFO(jnlpool->gtmsource_local->log_file, logfile_msg);
log_waitmsg = FALSE;
} else if (REPL_XOFF_ACK_ME == msg.type)
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received REPL_XOFF_ACK_ME message. "
"Possible crash/shutdown of update process\n");
/* Send XOFF_ACK */
xoff_ack.type = REPL_XOFF_ACK;
if (msg_is_cross_endian)
*recvd_jnl_seqno = GTM_BYTESWAP_64(*recvd_jnl_seqno);
memcpy((uchar_ptr_t)&xoff_ack.msg[0], (uchar_ptr_t)recvd_jnl_seqno, SIZEOF(seq_num));
xoff_ack.len = MIN_REPL_MSGLEN;
repl_log(gtmsource_log_fp, TRUE, TRUE, "Sending REPL_XOFF_ACK message\n");
REPL_SEND_LOOP(gtmsource_sock_fd, &xoff_ack, xoff_ack.len, REPL_POLL_NOWAIT)
{
gtmsource_poll_actions(FALSE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
return (SS_NORMAL);
}
log_waitmsg = TRUE; /* Wait for REPL_START_JNL_SEQNO or REPL_FETCH_RESYNC */
} else
{ /* If unknown message is received, close connection. Caller will reopen the same. */
repl_log(gtmsource_log_fp, TRUE, TRUE, "Received UNKNOWN message (type = %d). "
"Closing connection.\n", msg.type);
assert(FALSE);
repl_close(>msource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return (SS_NORMAL);
}
}
return (status);
}
int gtmsource_srch_restart(seq_num recvd_jnl_seqno, int recvd_start_flags)
{
seq_num cur_read_jnl_seqno;
qw_off_t cur_read_addr;
gtm_uint64_t cur_read, prev_read, prev_tr_size, jnlpool_size;
int save_lastwrite_len;
unsigned char seq_num_str[32], *seq_num_ptr;
jnlpool_ctl_ptr_t jctl;
gtmsource_local_ptr_t gtmsource_local;
gd_region *reg, *region_top;
sgmnt_addrs *csa, *repl_csa;
jctl = jnlpool->jnlpool_ctl;
jnlpool_size = jctl->jnlpool_size;
gtmsource_local = jnlpool->gtmsource_local;
assert(recvd_jnl_seqno <= jctl->jnl_seqno);
cur_read_jnl_seqno = gtmsource_local->read_jnl_seqno;
if (recvd_jnl_seqno > cur_read_jnl_seqno)
{ /* The secondary is requesting a seqno higher than what we last remember having sent but yet it is in sync with us
* upto seqno "recvd_jnl_seqno" as otherwise the caller would have determined it is out of sync and not even call
* us. To illustrate an example of how this could happen, consider an INSTA->INSTB replication and INSTA->INSTC
* replication going on. Lets say INSTA's journal sequence number is at 100. INSTB is at 60 and INSTC is at 30.
* This means, last sequence number sent by INSTA to INSTB is 60 and to INSTC is 30. Now, lets say INSTA is shutdown
* and INSTB comes up as the primary to INSTC and starts replicating the 30 updates thereby bringing both INSTB and
* INSTC sequence number to 60. Now, if INSTA comes backup again as primary against INSTC, we will have a case where
* gtmsource_local->read_jnl_seqno as 30, but recvd_jnl_seqno as 60. This means that we are going to bump
* "gtmsource_local->read_jnl_seqno" up to the received seqno (in the later call to "gtmsource_flush_fh") without
* knowing how many bytes of transaction data that meant (to correspondingly bump up "gtmsource_local->read_addr").
* The only case where we don't care to maintain "read_addr" is if we are in READ_FILE mode AND the current
* read_jnl_seqno and the received seqno is both lesser than or equal to "gtmsource_save_read_jnl_seqno". Except
* for that case, we need to reset "gtmsource_save_read_jnl_seqno" to correspond to the current jnl seqno.
*/
if ((READ_FILE != gtmsource_local->read_state) || (recvd_jnl_seqno > gtmsource_save_read_jnl_seqno))
{
grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, ASSERT_NO_ONLINE_ROLLBACK);
gtmsource_local->read_state = READ_FILE;
gtmsource_save_read_jnl_seqno = jctl->jnl_seqno;
GTMSOURCE_SET_READ_ADDR(gtmsource_local, jnlpool);
rel_lock(jnlpool->jnlpool_dummy_reg);
}
} else if (READ_POOL == gtmsource_local->read_state)
{ /* Follow the back-chain in the Journal Pool to find whether or not recvd_jnl_seqno is in the Pool */
/* The implementation for searching through the back chain has several inefficiences. We are deferring addressing
* them to keep code changes for V4.4-003 to a minimum. We should address these in an upcoming release.
* Vinaya 2003, Oct 02 */
QWASSIGN(cur_read_addr, gtmsource_local->read_addr);
cur_read = gtmsource_local->read;
if (jnlpool_hasnt_overflowed(jctl, jnlpool_size, cur_read_addr) &&
QWGT(cur_read_jnl_seqno, recvd_jnl_seqno) &&
QWGT(cur_read_jnl_seqno, jctl->start_jnl_seqno))
{
if (QWGE(jctl->rsrv_write_addr, cur_read_addr))
{ /* If there is no more input to be read, the previous transaction size should not be read from the
* journal pool since the read pointers point to the next read. In such a case, we can find the
* size of the transcation cur_read_jnl_seqno from jctl->lastwrite_len. We should access
* lastwrite_len after a memory barrier to avoid reading a stale value. We rely on the memory
* barrier done in jnlpool_hasnt_overflowed */
save_lastwrite_len = jctl->lastwrite_len;
if (QWEQ(jctl->rsrv_write_addr, cur_read_addr))
{ /* GT.M is not writing any transaction, safe to rely on jctl->lastwrite_len. Note,
* GT.M could not have been writing transaction cur_read_jnl_seqno if we are here. Also,
* lastwrite_len cannot be in the future w.r.t rsrv_write_addr because of the memory
* barriers we do in t{p}_{t}end.c. It can be behind by atmost one transaction
* (cur_read_jnl_seqno). Well, we want the length of transaction cur_read_jnl_seqno,
* not cur_read_jnl_seqno + 1.
*/
QWDECRBYDW(cur_read_addr, save_lastwrite_len);
QWDECRBYDW(cur_read_jnl_seqno, 1);
prev_read = cur_read;
if (cur_read > save_lastwrite_len)
cur_read -= save_lastwrite_len;
else
{
cur_read = cur_read - (save_lastwrite_len % jnlpool_size);
if (cur_read >= prev_read)
cur_read += jnlpool_size;
}
assert(cur_read == QWMODDW(cur_read_addr, jnlpool_size));
REPL_DPRINT2("Srch restart : No more input in jnlpool, backing off to read_jnl_seqno : "
INT8_FMT, INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT3(" read_addr : "INT8_FMT" read : %ld\n", INT8_PRINT(cur_read_addr), cur_read);
}
}
if (QWEQ(cur_read_addr, jctl->write_addr))
{ /* Check if there are any pending phase2 commits that can be cleaned up.
* That will bring jctl->write_addr more uptodate. And then redo the read_addr/write_addr check.
*/
if (jctl->write_addr != jctl->rsrv_write_addr)
repl_phase2_cleanup(jnlpool);
if (QWEQ(cur_read_addr, jctl->write_addr))
{ /* We caught a GTM process writing cur_read_jnl_seqno + 1,
* we cannot rely on lastwrite_len as it may or may not have changed.
* Wait until the GTM process finishes writing this transaction.
*/
repl_log(gtmsource_log_fp, TRUE, FALSE, "SEARCHING RESYNC POINT IN POOL : "
"Waiting for GTM process to finish writing journal records to the pool\n");
while (QWEQ(cur_read_addr, jctl->write_addr))
{
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_JNL_RECS);
gtmsource_poll_actions(FALSE);
/* Check if there are any pending phase2 commits that can be cleaned up.
* That will bring jctl->write_addr more uptodate.
*/
if (jctl->write_addr != jctl->rsrv_write_addr)
repl_phase2_cleanup(jnlpool);
}
repl_log(gtmsource_log_fp, TRUE, FALSE, "SEARCHING RESYNC POINT IN POOL : "
"GTM process finished writing journal records to the pool\n");
}
}
}
while (jnlpool_hasnt_overflowed(jctl, jnlpool_size, cur_read_addr) &&
QWGT(cur_read_jnl_seqno, recvd_jnl_seqno) &&
QWGT(cur_read_jnl_seqno, jctl->start_jnl_seqno))
{
assert(cur_read + SIZEOF(jnldata_hdr_struct) <= jnlpool_size);
prev_tr_size = ((jnldata_hdr_ptr_t)(jnlpool->jnldata_base + cur_read))->prev_jnldata_len;
if ((prev_tr_size <= cur_read_addr) &&
jnlpool_hasnt_overflowed(jctl, jnlpool_size, cur_read_addr - prev_tr_size))
{
QWDECRBYDW(cur_read_addr, prev_tr_size);
prev_read = cur_read;
cur_read -= prev_tr_size;
if (cur_read >= prev_read)
cur_read += jnlpool_size;
assert(cur_read == QWMODDW(cur_read_addr, jnlpool_size));
QWDECRBYDW(cur_read_jnl_seqno, 1);
REPL_DPRINT2("Srch restart : No overflow yet, backing off to read_jnl_seqno : "INT8_FMT,
INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT3(" read_addr : "INT8_FMT" read : %ld\n", INT8_PRINT(cur_read_addr), cur_read);
continue;
}
break;
}
QWASSIGN(gtmsource_local->read_addr, cur_read_addr);
gtmsource_local->read = cur_read;
if (jnlpool_hasnt_overflowed(jctl, jnlpool_size, cur_read_addr) &&
QWEQ(cur_read_jnl_seqno, recvd_jnl_seqno) &&
QWGE(cur_read_jnl_seqno, jctl->start_jnl_seqno))
{
REPL_DPRINT2("Srch restart : Now in READ_POOL state read_jnl_seqno : "INT8_FMT,
INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT3(" read_addr : "INT8_FMT" read : %ld\n",INT8_PRINT(cur_read_addr), cur_read);
} else
{
/* Overflow, or requested seqno too far back to be in pool */
REPL_DPRINT2("Srch restart : Now in READ_FILE state. Changing sync point to read_jnl_seqno : "INT8_FMT,
INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT3(" read_addr : "INT8_FMT" read : %ld ", INT8_PRINT(cur_read_addr), cur_read);
REPL_DPRINT2("save_read_jnl_seqno : "INT8_FMT"\n", INT8_PRINT(gtmsource_save_read_jnl_seqno));
QWASSIGN(gtmsource_save_read_jnl_seqno, cur_read_jnl_seqno);
if (QWLT(gtmsource_save_read_jnl_seqno, jctl->start_jnl_seqno))
{
QWASSIGN(gtmsource_save_read_jnl_seqno, jctl->start_jnl_seqno);
assert(QWEQ(gtmsource_local->read_addr, seq_num_zero));
assert(gtmsource_local->read == 0);
/* For pro version, force zero assignment */
QWASSIGN(gtmsource_local->read_addr, seq_num_zero);
gtmsource_local->read = 0;
REPL_DPRINT2("Srch restart : Sync point "INT8_FMT, INT8_PRINT(gtmsource_save_read_jnl_seqno));
REPL_DPRINT2(" beyond start_seqno : "INT8_FMT, INT8_PRINT(jctl->start_jnl_seqno));
REPL_DPRINT3(", sync point set to read_addr : "INT8_FMT" read : %d\n",
INT8_PRINT(gtmsource_local->read_addr), gtmsource_local->read);
}
gtmsource_local->read_state = READ_FILE;
repl_log(gtmsource_log_fp, TRUE, FALSE, "Source server now reading from journal files; journal pool "
"does not contain transaction %llu [0x%llx]\n", recvd_jnl_seqno, recvd_jnl_seqno);
gtmsource_pool2file_transition = TRUE;
}
} else /* read_state is READ_FILE and requesting a sequence number less than or equal to read_jnl_seqno */
{
assert(cur_read_jnl_seqno == gtmsource_local->read_jnl_seqno);
if (cur_read_jnl_seqno > gtmsource_save_read_jnl_seqno)
gtmsource_save_read_jnl_seqno = cur_read_jnl_seqno;
REPL_DPRINT2("Srch restart : Continuing in READ_FILE state. Retaining sync point for read_jnl_seqno : "INT8_FMT,
INT8_PRINT(cur_read_jnl_seqno));
REPL_DPRINT2(" at read_addr : "INT8_FMT, INT8_PRINT(gtmsource_local->read_addr));
REPL_DPRINT3(" read : %d corresponding to save_read_jnl_seqno : "INT8_FMT"\n", gtmsource_local->read,
INT8_PRINT(gtmsource_save_read_jnl_seqno));
repl_log(gtmsource_log_fp, TRUE, FALSE, "Source server continuing to read from journal files at seqno "
"%llu [0x%llx]\n", recvd_jnl_seqno, recvd_jnl_seqno);
}
REPL_DPRINT2("Setting resync_seqno to "INT8_FMT"\n", INT8_PRINT(recvd_jnl_seqno));
repl_log(gtmsource_log_fp, TRUE, FALSE, "Source server last sent seqno %llu [0x%llx]\n",
cur_read_jnl_seqno, cur_read_jnl_seqno);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Source server will start sending from seqno %llu [0x%llx]\n",
recvd_jnl_seqno, recvd_jnl_seqno);
if (READ_POOL == gtmsource_local->read_state)
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source server now reading from journal pool\n");
else
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source server now reading from journal files\n");
/* Finally set "gtmsource_local->read_jnl_seqno" to be "recvd_jnl_seqno" and flush changes to instance file on disk */
gtmsource_flush_fh(recvd_jnl_seqno, false);
assert(GTMSOURCE_HANDLE_ONLN_RLBK != gtmsource_state);
return (SS_NORMAL);
}
int gtmsource_get_jnlrecs(uchar_ptr_t buff, int *data_len, int maxbufflen, boolean_t read_multiple)
{
int total_tr_len;
unsigned char seq_num_str[32], *seq_num_ptr;
jnlpool_ctl_ptr_t jctl;
gtmsource_local_ptr_t gtmsource_local;
seq_num jnl_seqno, read_jnl_seqno;
qw_num write_addr, read_addr;
gtmsource_state_t gtmsource_state_sav;
int index1;
# ifdef DEBUG
DCL_THREADGBL_ACCESS;
SETUP_THREADGBL_ACCESS;
# endif
jctl = jnlpool->jnlpool_ctl;
gtmsource_local = jnlpool->gtmsource_local;
jnl_seqno = jctl->jnl_seqno;
read_jnl_seqno = gtmsource_local->read_jnl_seqno;
read_addr = gtmsource_local->read_addr;
GTMDBGFLAGS_NOFREQ_ONLY(GTMSOURCE_FORCE_READ_FILE_MODE, gtmsource_local->read_state = READ_FILE);
switch(gtmsource_local->read_state)
{
case READ_POOL:
/* Check if there are any pending phase2 commits that can be cleaned up.
* That will bring jctl->write_addr more uptodate.
*/
index1 = jctl->phase2_commit_index1;
assert((0 <= index1) && (JPL_PHASE2_COMMIT_ARRAY_SIZE > index1));
if ((index1 != jctl->phase2_commit_index2) && jctl->phase2_commit_array[index1].write_complete
&& (LOCK_AVAILABLE == jctl->phase2_commit_latch.u.parts.latch_pid))
repl_phase2_cleanup(jnlpool);
/* Now that write_addr is uptodate, go ahead and read jnl records */
write_addr = jctl->write_addr;
assert((0 != write_addr) || (read_jnl_seqno <= jctl->start_jnl_seqno));
assert(read_addr <= write_addr);
if (read_addr == write_addr)
{ /* Nothing to read. While reading pool, the comparison of read_addr against write_addr is
* the only reliable indicator if there are any transactions to be read. This is due to
* the placement of memory barriers in t_end/tp_tend.c. Also, since we do not issue memory
* barrier here, we may be reading a stale value of write_addr in which case we may conclude
* that there is nothing to read. But, this will not continue forever as the source server
* eventually (decided by architecture's implementation) will see the change to write_addr.
*/
*data_len = 0;
return (0);
}
total_tr_len = gtmsource_readpool(buff, data_len, maxbufflen, read_multiple, write_addr);
if (GTMSOURCE_SEND_NEW_HISTINFO == gtmsource_state)
return (0); /* need to send REPL_HISTREC message before sending any more seqnos */
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
return (0); /* Connection got reset in call to "gtmsource_readpool" */
if (0 < total_tr_len)
{ /* Found the entire seqno in the jnlpool. Check if the first journal record of this seqno
* is of type JRT_BAD. This indicates a ERR_JNLPOOLRECOVERY situation (see
* JPL_PHASE2_WRITE_COMPLETE macro for details). If so, switch to reading from files.
*/
assert(total_tr_len >= (SIZEOF(jnldata_hdr_struct) + SIZEOF(jrec_prefix)));
if (JRT_BAD != ((jrec_prefix *)buff)->jrec_type)
return (total_tr_len);
assert((0 != TREF(gtm_test_jnlpool_sync)) && (0 == (read_jnl_seqno % TREF(gtm_test_jnlpool_sync))));
*data_len = -1; /* so we do fall through to READ_FILE code below */
}
if (0 < *data_len)
return (-1);
/* Overflow, switch to READ_FILE */
gtmsource_local->read_state = READ_FILE;
QWASSIGN(gtmsource_save_read_jnl_seqno, read_jnl_seqno);
gtmsource_pool2file_transition = TRUE; /* so that we read the latest gener jnl files */
repl_log(gtmsource_log_fp, TRUE, TRUE, "Source server now reading from journal files; journal pool "
"overflow detected at seqno %llu [0x%llx]\n",
gtmsource_save_read_jnl_seqno, gtmsource_save_read_jnl_seqno);
/* CAUTION : FALL THROUGH */
case READ_FILE:
/* Note that while reading from journal files, it is possible the source server sees the journal records
* for a transaction in the journal files BEFORE the transaction is marked as complete in the journal
* pool (because FINISH_JNL_PHASE2_IN_JNLPOOL_IF_NEEDED is called AFTER FINISH_JNL_PHASE2_IN_JNLBUFF
* in NONTP_FINISH_JNL_PHASE2_IN_JNLBUFF_AND_JNLPOOL and TP_FINISH_JNL_PHASE2_IN_JNLBUFF_AND_JNLPOOL).
* Therefore it is possible read_addr (which is derived from the journal file records) could be greater
* than write_addr (which is derived from the journal pool). And so "assert(read_addr <= write_addr)"
* cannot be done here like is done for the READ_POOL case.
*/
if (read_jnl_seqno >= jnl_seqno)
{ /* Nothing to read. While reading from files, source server does not use write_addr to decide
* how much to read. Also, it is possible that read_addr and write_addr are the same if the
* source server came up after a crash and syncs with the latest state in jnlpool (see
* gtmsource()). These reasons preclude the comparison of read_addr and write_addr (as we did for
* READ_POOL case) to decide whether there are any unsent transactions. We use jnl_seqno instead.
* Note though, the source server's view of jnl_seqno may be stale, and we may conclude that
* we don't have anything to read as we do not do memory barrier here to fetch the latest
* value of jnl_seqno. But, this will not continue forever as the source server eventually
* (decided by architecture's implementation) will see the change to jnl_seqno.
*
* On systems that allow unordered memory access, it is possible that the value of jnl_seqno
* as seen by source server is in the past compared to read_jnl_seqno - source server in
* keeping up with updaters reads (from pool) and sends upto write_addr, the last transaction
* of which could be jnl_seqno + 1. To cover the case, we use >= in the above comparison.
* Given this, we may return with "nothing to read" even though we fell through from the
* READ_POOL case.
*/
*data_len = 0;
return 0;
}
if (gtmsource_pool2file_transition /* read_pool -> read_file transition */
|| NULL == repl_ctl_list) /* files not opened */
{
/* Close all the file read related structures and start afresh. The idea here is that most of the
* file read info might be stale 'cos there is usually a long gap between pool to file read
* transitions (in production environments). So, start afresh with the latest generation journal
* files. This will also prevent opening previous generations that may not be required.
*/
REPL_DPRINT1("Pool to File read transition. Closing all the stale file read info and starting "
"afresh\n");
gtmsource_ctl_close();
gtmsource_ctl_init();
gtmsource_pool2file_transition = FALSE;
}
GTMSOURCE_SAVE_STATE(gtmsource_state_sav);
total_tr_len = gtmsource_readfiles(buff, data_len, maxbufflen, read_multiple);
if (GTMSOURCE_NOW_TRANSITIONAL(gtmsource_state_sav))
return (0); /* Control message triggered mode change while reading files */
if (GTMSOURCE_SEND_NEW_HISTINFO == gtmsource_state)
return (0); /* need to send REPL_HISTREC message before sending any more seqnos */
if (0 < total_tr_len)
return (total_tr_len);
assertpro(0 < *data_len);
return (-1);
case READ_BUFF:
assert(false);
default:
assert(false);
}
return (-1); /* This should never get executed, added to make compiler happy */
}
/* This function can be used to only send fixed-size message types across the replication pipe.
* This in turn uses REPL_SEND* macros but also does error checks and sets the global variable "gtmsource_state" accordingly.
*
* msg = Pointer to the message buffer to send
* msgtypestr = Message name as a string to display meaningful error messages
* optional_seqno = Optional seqno that needs to be printed along with the message name
*/
void gtmsource_repl_send(repl_msg_ptr_t msg, char *msgtypestr, seq_num optional_seqno, int4 optional_strm_num)
{
unsigned char *msg_ptr; /* needed for REPL_SEND_LOOP */
int tosend_len, sent_len, sent_this_iter; /* needed for REPL_SEND_LOOP */
int status, poll_dir; /* needed for REPL_{SEND,RECV}_LOOP */
char err_string[1024];
boolean_t close_retry = FALSE;
assert((REPL_MULTISITE_MSG_START > msg->type) || (REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver));
if (MAX_SEQNO != optional_seqno)
{
if (INVALID_SUPPL_STRM == optional_strm_num)
{
if (REPL_ROLLBACK_FIRST != msg->type)
{
repl_log(gtmsource_log_fp, TRUE, FALSE,
"Sending %s message with seqno %llu [0x%llx]\n",msgtypestr, optional_seqno, optional_seqno);
} else if (REPL_ROLLBACK_FIRST == msg->type)
{
REPL_DPRINT1("Received REPL_ROLLBACK_FIRST message");
}
} else
repl_log(gtmsource_log_fp, TRUE, FALSE, "Sending %s message with seqno %llu [0x%llx] for Stream # %2d\n",
msgtypestr, optional_seqno, optional_seqno, optional_strm_num);
} else
repl_log(gtmsource_log_fp, TRUE, FALSE, "Sending %s message\n", msgtypestr);
REPL_SEND_LOOP(gtmsource_sock_fd, msg, msg->len, REPL_POLL_NOWAIT)
{
gtmsource_poll_actions(FALSE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return;
}
/* Check for error status from the REPL_SEND */
if (SS_NORMAL != status)
{
if (EREPL_SEND == repl_errno)
{
if (REPL_CONN_RESET(status))
{
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connection reset while sending %s. Status = %d ; %s\n",
msgtypestr, status, STRERROR(status));
repl_log_conn_info(gtmsource_sock_fd, gtmsource_log_fp, TRUE);
#ifdef DEBUG
if (WBTEST_ENABLED(WBTEST_REPLCOMM_SEND_SRC))
gtm_wbox_input_test_case_count = 6; /*Do not got into white box case again */
#endif
close_retry = TRUE;
}
if (close_retry)
{
repl_close(>msource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return;
} else
{
SNPRINTF(err_string, SIZEOF(err_string), "Error sending %s message. "
"Error in send : %s", msgtypestr, STRERROR(status));
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_STR(err_string));
}
}
if (EREPL_SELECT == repl_errno)
{
SNPRINTF(err_string, SIZEOF(err_string), "Error sending %s message. "
"Error in select : %s", msgtypestr, STRERROR(status));
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2, LEN_AND_STR(err_string));
}
}
}
/* This function can be used to only receive fixed-size message types across the replication pipe.
* This in turn uses REPL_RECV* macros but also does error checks and sets the global variable "gtmsource_state" accordingly.
* While waiting for the input message type, this function also recognizes
* a) a REPL_XOFF_ACK_ME message in which case invokes "gtmsource_repl_send" to send a REPL_XOFF_ACK message type.
* b) a REPL_XOFF or REPL_XON message in which case ignores them as we are still in the initial handshake stage
* and these messages from the receiver are not relevant.
*
* msg = Pointer to the message buffer where received message will be stored
* msglen = Length of the message to receive
* msgtype = Expected type of the message (if received message is not of this type, the connection is closed)
* msgtypestr = Message name as a string to display meaningful error messages
*/
static boolean_t gtmsource_repl_recv(repl_msg_ptr_t msg, int4 msglen, int4 msgtype, char *msgtypestr)
{
unsigned char *msg_ptr; /* needed for REPL_RECV_LOOP */
int torecv_len, recvd_len, recvd_this_iter; /* needed for REPL_RECV_LOOP */
int status, poll_dir; /* needed for REPL_{SEND,RECV}_LOOP */
repl_msg_t xoff_ack;
char err_string[1024];
unsigned char *buff;
int4 bufflen;
repl_log(gtmsource_log_fp, TRUE, FALSE, "Waiting for %s message\n", msgtypestr);
assert((REPL_XOFF != msgtype) && (REPL_XON != msgtype) && (REPL_XOFF_ACK_ME != msgtype));
buff = (unsigned char *)msg;
bufflen = msglen;
do
{ /* Note that "bufflen" could potentially be > 32-byte (MIN_REPL_MSGLEN) the length of most replication
* messages, in case we are expecting to receive a REPL_CMP_SOLVE message. In that case, it is possible
* that while waiting for the 512 byte or so REPL_CMP_SOLVE message, we get a 32-byte REPL_XOFF_ACK_ME
* message. In this case, we should break out of the REPL_RECV_LOOP as otherwise we would be eternally
* waiting for a never-to-come REPL_CMP_SOLVE message. This is in fact a general issue with any REPL_RECV_LOOP
* code that passes a 3rd parameter > MIN_REPL_MSGLEN. All such usages need a check inside the body of the
* loop to account for a REPL_XOFF_ACK_ME and if so break.
*/
REPL_RECV_LOOP(gtmsource_sock_fd, buff, bufflen, REPL_POLL_WAIT)
{
gtmsource_poll_actions(TRUE);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE;
/* Check if we received an XOFF_ACK_ME message completely. If yes, we can safely break out of the
* loop without receiving the originally intended message (as the receiver is going to drain away all
* the stuff in the replication pipe anyways and reinitiate a fresh handshake). This way we don't hang
* eternally waiting for a never-arriving originally intended message.
*/
if ((MIN_REPL_MSGLEN <= recvd_len) && (REPL_XOFF_ACK_ME == msg->type))
break;
}
if (SS_NORMAL != status)
{
if (EREPL_RECV == repl_errno)
{
if (REPL_CONN_RESET(status))
{ /* Connection reset */
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Connection reset while attempting to receive %s message. Status = %d ; %s\n",
msgtypestr, status, STRERROR(status));
repl_close(>msource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state =
GTMSOURCE_WAITING_FOR_CONNECTION;
return FALSE;
} else
{
SNPRINTF(err_string, SIZEOF(err_string),
"Error receiving %s message from Receiver. Error in recv : %s",
msgtypestr, STRERROR(status));
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_STR(err_string));
}
} else if (EREPL_SELECT == repl_errno)
{
SNPRINTF(err_string, SIZEOF(err_string),
"Error receiving %s message from Receiver. Error in select : %s",
msgtypestr, STRERROR(status));
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPLCOMM, 0, ERR_TEXT, 2,
LEN_AND_STR(err_string));
}
}
assert(SS_NORMAL == status);
if ((REPL_XON != msg->type) && (REPL_XOFF != msg->type))
break;
/* Strip the "msg" buffer of the XON or XOFF message and redo the loop until "msglen" bytes of the expected
* msgtype are obtained. If "msglen" matches the XON or XOFF message length, then we can discard the ENTIRE message
* and redo the loop without having to strip a "partial" msg buffer. If "msglen" is LESSER than a complete XON or
* XOFF message, then we have to finish reading the entire XON/XOFF message and then redo the loop. But we
* expect all callers to set "msglen" to at least the XON/XOFF message length. Assert accordingly.
* Note that the below logic works even if more than one XON/XOFF messages get sent before the expected message.
* For every XON/XOFF message, we will do one memmove and go back in the loop to read MIN_REPL_MSGLEN-more bytes.
*/
assert(MIN_REPL_MSGLEN == msg->len);
assert(MIN_REPL_MSGLEN <= msglen);
bufflen = msg->len;
if (msglen != bufflen)
{
memmove(msg, (uchar_ptr_t)msg + bufflen, msglen - bufflen);
buff = (uchar_ptr_t)msg + msglen - bufflen;
}
} while (TRUE);
/* Check if message received is indeed of expected type */
assert(remote_side->endianness_known); /* so we ensure msg->type we read below is accurate and not cross-endian format */
if (REPL_XOFF_ACK_ME == msg->type)
{ /* Send XOFF_ACK. Anything sent before this in the replication pipe will be drained and therefore
* return to the caller so it can reissue the message exchange sequence right from the beginning.
*/
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received REPL_XOFF_ACK_ME message\n", msgtypestr);
xoff_ack.type = REPL_XOFF_ACK;
memcpy((uchar_ptr_t)&xoff_ack.msg[0], (uchar_ptr_t)>msource_msgp->msg[0], SIZEOF(seq_num));
xoff_ack.len = MIN_REPL_MSGLEN;
gtmsource_repl_send((repl_msg_ptr_t)&xoff_ack, "REPL_XOFF_ACK", MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* "gtmsource_repl_send" did not complete */
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_RESTART;
return FALSE;
} else if (msgtype != msg->type)
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "UNKNOWN msg (type = %d) received when waiting for msg (type = %d)"
". Closing connection.\n", msg->type, msgtype);
repl_close(>msource_sock_fd);
assert(FALSE);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return FALSE;
} else
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received %s message\n", msgtypestr);
return TRUE;
}
}
/* Given that the source server was started with compression enabled, this function checks if the receiver server was
* also started with the decompression enabled and if so sends a compressed test message. The receiver server responds back
* whether it is successfully able to decompress that or not. If yes, compression is enabled on the replication pipe and
* the input parameter "*repl_zlib_cmp_level_ptr" is set to the compression level used.
*/
boolean_t gtmsource_get_cmp_info(int4 *repl_zlib_cmp_level_ptr)
{
repl_cmpinfo_msg_t test_msg, solve_msg;
char inputdata[REPL_MSG_CMPDATALEN], cmpbuf[REPL_MSG_CMPEXPDATALEN];
int index, cmpret, start;
boolean_t cmpfail;
uLongf cmplen;
DCL_THREADGBL_ACCESS;
SETUP_THREADGBL_ACCESS;
assert(gtm_zlib_cmp_level);
/*************** Send REPL_CMP_TEST message ***************/
memset(&test_msg, 0, SIZEOF(test_msg));
test_msg.type = REPL_CMP_TEST;
test_msg.len = REPL_MSG_CMPINFOLEN;
test_msg.proto_ver = REPL_PROTO_VER_THIS;
/* Fill in test data with random data. The data will be a sequence of bytes from 0 to 255. The start point though
* is randomly chosen using the process_id. If it is 253, the resulting sequence would be 253, 254, 255, 0, 1, 2, ...
*/
for (start = (process_id & REPL_MSG_CMPDATAMASK), index = 0; index < REPL_MSG_CMPDATALEN; index++)
inputdata[index] = (start + index) % REPL_MSG_CMPDATALEN;
/* Compress the data */
cmplen = SIZEOF(cmpbuf); /* initialize it to the available compressed buffer space */
ZLIB_COMPRESS(&cmpbuf[0], cmplen, inputdata, REPL_MSG_CMPDATALEN, gtm_zlib_cmp_level, cmpret);
switch(cmpret)
{
case Z_MEM_ERROR:
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE,
"Out-of-memory; Error from zlib compress function before sending REPL_CMP_TEST message\n");
break;
case Z_BUF_ERROR:
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Insufficient output buffer; Error from zlib compress "
"function before sending REPL_CMP_TEST message\n");
break;
case Z_STREAM_ERROR:
/* level was incorrectly specified. Default to NO compression. */
repl_log(gtmsource_log_fp, TRUE, FALSE, "Compression level %d invalid; Error from compress function"
" before sending REPL_CMP_TEST message\n", gtm_zlib_cmp_level);
break;
}
if (Z_OK != cmpret)
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "Defaulting to NO compression\n");
*repl_zlib_cmp_level_ptr = ZLIB_CMPLVL_NONE; /* no compression */
return TRUE;
}
if (REPL_MSG_CMPEXPDATALEN < cmplen)
{ /* The zlib compression library expanded data more than we had allocated for. But handle code for pro */
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Compression of %d bytes of test data resulted in %d bytes which is"
" more than allocated space of %d bytes\n", REPL_MSG_CMPDATALEN, cmplen, REPL_MSG_CMPEXPDATALEN);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Defaulting to NO compression\n");
*repl_zlib_cmp_level_ptr = ZLIB_CMPLVL_NONE; /* no compression */
return TRUE;
}
test_msg.datalen = (int4)cmplen;
memcpy(test_msg.data, cmpbuf, test_msg.datalen);
gtmsource_repl_send((repl_msg_ptr_t)&test_msg, "REPL_CMP_TEST", MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/* for in-house testing, set up a timer that would cause assert failure if REPL_CMP_SOLVE is not received
* within 1 or 15 minutes, depending on whether this is a white-box test or not */
# ifdef REPL_CMP_SOLVE_TESTING
if (TREF(gtm_environment_init))
start_timer((TID)repl_cmp_solve_src_timeout, 15 * 60 * 1000, repl_cmp_solve_src_timeout, 0, NULL);
# endif
/*************** Receive REPL_CMP_SOLVE message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&solve_msg, REPL_MSG_CMPINFOLEN, REPL_CMP_SOLVE, "REPL_CMP_SOLVE"))
{
# ifdef REPL_CMP_SOLVE_TESTING
if (TREF(gtm_environment_init))
cancel_timer((TID)repl_cmp_solve_src_timeout);
# endif
return FALSE; /* recv did not succeed */
}
# ifdef REPL_CMP_SOLVE_TESTING
if (TREF(gtm_environment_init))
cancel_timer((TID)repl_cmp_solve_src_timeout);
# endif
assert(REPL_CMP_SOLVE == solve_msg.type);
cmpfail = FALSE;
if (REPL_MSG_CMPDATALEN != solve_msg.datalen)
{
assert(REPL_RCVR_CMP_TEST_FAIL == solve_msg.datalen);
cmpfail = TRUE;
} else
{ /* Check that receiver side decompression was correct */
for (index = 0; index < REPL_MSG_CMPDATALEN; index++)
{
if (inputdata[index] != solve_msg.data[index])
{
cmpfail = FALSE;
break;
}
}
}
if (!cmpfail)
{
assert(solve_msg.proto_ver == remote_side->proto_ver);
assert(REPL_PROTO_VER_MULTISITE_CMP <= solve_msg.proto_ver);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Receiver server was able to decompress successfully\n");
*repl_zlib_cmp_level_ptr = gtm_zlib_cmp_level;
repl_log(gtmsource_log_fp, TRUE, FALSE, "Using zlib compression level %d for replication\n", gtm_zlib_cmp_level);
} else
{
repl_log(gtmsource_log_fp, TRUE, FALSE, "Receiver server could not decompress successfully\n");
repl_log(gtmsource_log_fp, TRUE, FALSE, "Defaulting to NO compression\n");
*repl_zlib_cmp_level_ptr = ZLIB_CMPLVL_NONE;
}
return TRUE;
}
void repl_cmp_solve_src_timeout(void)
{
assertpro(FALSE);
}
#ifdef GTM_TLS
boolean_t gtmsource_exchange_tls_info(void)
{
int errlen, poll_dir, status, flags;
char *errp;
repl_tlsinfo_msg_t reply, response;
assert(NULL != tls_ctx);
assert(REPL_TLS_REQUESTED);
assert(!repl_tls.enabled);
/* Send REPL_NEED_TLS_INFO message. We pass our GT.M TLS API version as part of this message. */
reply.type = REPL_NEED_TLS_INFO;
reply.len = MIN_REPL_MSGLEN;
reply.API_version = GTM_TLS_API_VERSION;
reply.library_version = (uint4)tls_ctx->runtime_version;
gtmsource_repl_send((repl_msg_ptr_t)&reply, "REPL_NEED_TLS_INFO", MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/* Receive REPL_TLS_INFO message. The GT.M TLS API version of the receiver is sent as part of the message. */
if (!gtmsource_repl_recv((repl_msg_ptr_t)&response, MIN_REPL_MSGLEN, REPL_TLS_INFO, "REPL_TLS_INFO"))
return FALSE; /* recv did not succeed */
repl_log(gtmsource_log_fp, TRUE, TRUE, " Remote side API version: 0x%08x\n", response.API_version);
repl_log(gtmsource_log_fp, TRUE, TRUE, " Remote side Library version: 0x%08x\n", response.library_version);
flags = GTMTLS_OP_FORCE_VERIFY_PEER | GTMTLS_OP_CLIENT_MODE;
/* At this point, both sides are ready for a TLS/SSL handshake. Create a TLS/SSL aware socket. */
if (NULL == (repl_tls.sock = gtm_tls_socket(tls_ctx, repl_tls.sock, gtmsource_sock_fd, repl_tls.id, flags)))
{
if (!PLAINTEXT_FALLBACK)
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_TLSCONVSOCK, 0, ERR_TEXT, 2, LEN_AND_STR(gtm_tls_get_error(NULL)));
gtm_putmsg_csa(CSA_ARG(NULL) VARLSTCNT(6) MAKE_MSG_WARNING(ERR_TLSCONVSOCK), 0,
ERR_TEXT, 2, LEN_AND_STR(gtm_tls_get_error(NULL)));
} else
{
/* Do the actual handshake. */
poll_dir = REPL_INVALID_POLL_DIRECTION;
do
{
status = repl_do_tls_handshake(gtmsource_log_fp, gtmsource_sock_fd, FALSE, &poll_dir);
gtmsource_poll_actions(FALSE);
if (GTMSOURCE_CHANGING_MODE == gtmsource_state)
return FALSE;
} while ((GTMTLS_WANT_READ == status) || (GTMTLS_WANT_WRITE == status));
if (SS_NORMAL == status)
{
repl_log(gtmsource_log_fp, TRUE, TRUE, "Secure communication enabled using TLS/SSL protocol\n");
return TRUE;
} else if (REPL_CONN_RESET(status))
{
repl_log(gtmsource_log_fp, TRUE, TRUE, "Attempt to connect() with TLS/SSL protocol failed. "
"Status = %d; %s\n", status, STRERROR(status));
repl_close(>msource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return FALSE;
}
errp = (-1 == status) ? (char *)gtm_tls_get_error(NULL) : STRERROR(status);
if (2048 > (errlen = strlen(errp))) /* Append version information in one message */
snprintf(errp + errlen, 2048 - errlen, "; Local API:0x%08x, LIB:0x%08x; Remote API:0x%08x, LIB:0x%08x",
reply.API_version, reply.library_version, response.API_version, response.library_version);
if (!PLAINTEXT_FALLBACK)
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_TLSHANDSHAKE, 0, ERR_TEXT, 2, LEN_AND_STR(errp));
gtm_putmsg_csa(CSA_ARG(NULL) VARLSTCNT(6) MAKE_MSG_WARNING(ERR_TLSHANDSHAKE), 0, ERR_TEXT, 2, LEN_AND_STR(errp));
}
repl_log(gtmsource_log_fp, TRUE, TRUE, "Plaintext fallback enabled. Closing and reconnecting without TLS/SSL.\n");
repl_close(>msource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
CLEAR_REPL_TLS_REQUESTED; /* As if -tlsid qualifier was never specified. */
return FALSE;
}
#endif
/* Note: Do NOT reset input parameter "strm_jnl_seqno" unless receiver instance is supplementary and root primary */
boolean_t gtmsource_get_instance_info(boolean_t *secondary_was_rootprimary, seq_num *strm_jnl_seqno)
{
char print_msg[1024];
gtmsource_local_ptr_t gtmsource_local;
int status;
repl_instinfo_msg_t instinfo_msg;
repl_needinst_msg_t needinst_msg;
repl_old_instinfo_msg_t old_instinfo_msg;
repl_old_needinst_msg_t old_needinst_msg;
assert((NULL != jnlpool) && (NULL != jnlpool->repl_inst_filehdr)); /* journal pool should be set up */
gtmsource_local = jnlpool->gtmsource_local;
assert(REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver);
if (REPL_PROTO_VER_SUPPLEMENTARY > remote_side->proto_ver)
{ /* Use pre-supplementary protocol to communicate */
/*************** Send REPL_OLD_NEED_INSTANCE_INFO message ***************/
memset(&old_needinst_msg, 0, SIZEOF(old_needinst_msg));
old_needinst_msg.type = REPL_OLD_NEED_INSTANCE_INFO;
old_needinst_msg.len = MIN_REPL_MSGLEN;
memcpy(old_needinst_msg.instname, jnlpool->repl_inst_filehdr->inst_info.this_instname, MAX_INSTNAME_LEN - 1);
old_needinst_msg.proto_ver = REPL_PROTO_VER_THIS;
old_needinst_msg.node_endianness = NODE_ENDIANNESS;
old_needinst_msg.is_rootprimary = !(jnlpool->jnlpool_ctl->upd_disabled);
gtmsource_repl_send((repl_msg_ptr_t)&old_needinst_msg, "REPL_OLD_NEED_INSTANCE_INFO",
MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/*************** Receive REPL_OLD_INSTANCE_INFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&old_instinfo_msg, MIN_REPL_MSGLEN,
REPL_OLD_INSTANCE_INFO, "REPL_OLD_INSTANCE_INFO"))
return FALSE; /* recv did not succeed */
assert(REPL_OLD_INSTANCE_INFO == old_instinfo_msg.type);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received secondary instance name is [%s]\n", old_instinfo_msg.instname);
if (jnlpool->repl_inst_filehdr->is_supplementary)
{ /* Issue REPL2OLD error because this is a supplementary instance and remote side runs
* on a GT.M version that does not understand the supplementary protocol */
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_REPL2OLD, 4, LEN_AND_STR(old_instinfo_msg.instname),
LEN_AND_STR(jnlpool->repl_inst_filehdr->inst_info.this_instname));
}
/* Check if instance name in the REPL_OLD_INSTANCE_INFO message matches that in the source server command line */
if (STRCMP(old_instinfo_msg.instname, jnlpool->gtmsource_local->secondary_instname))
{ /* Instance name obtained from the receiver does not match what was specified in the
* source server command line. Issue error.
*/
sgtm_putmsg(print_msg, PROC_OPS_PRINT_MSG_LEN, VARLSTCNT(6) ERR_REPLINSTSECMTCH, 4,
LEN_AND_STR(old_instinfo_msg.instname), LEN_AND_STR(jnlpool->gtmsource_local->secondary_instname));
repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg);
status = gtmsource_shutdown(TRUE, NORMAL_SHUTDOWN) - NORMAL_SHUTDOWN;
gtmsource_exit(status);
}
*secondary_was_rootprimary = (boolean_t)old_instinfo_msg.was_rootprimary;
} else
{ /* Use supplementary protocol to communicate */
/*************** Send REPL_NEED_INSTINFO message ***************/
memset(&needinst_msg, 0, SIZEOF(needinst_msg));
needinst_msg.type = REPL_NEED_INSTINFO;
needinst_msg.len = SIZEOF(needinst_msg);
memcpy(needinst_msg.instname, jnlpool->repl_inst_filehdr->inst_info.this_instname, MAX_INSTNAME_LEN - 1);
needinst_msg.lms_group_info = jnlpool->repl_inst_filehdr->lms_group_info;
/* Need to byteswap a few multi-byte fields to take into account the receiver endianness */
assert(remote_side->endianness_known); /* only then is remote_side->cross_endian reliable */
/* Starting GT.M V62001, the receiver server expects an endian converted lms_group_info. So endian
* convert in that case. Pre-V62001, the receiver unconditionally did the endian conversion. So skip
* endian conversion in the source side in that case.
*/
if (remote_side->cross_endian && (REPL_PROTO_VER_XENDIANFIXES <= remote_side->proto_ver))
ENDIAN_CONVERT_REPL_INST_UUID(&needinst_msg.lms_group_info);
needinst_msg.proto_ver = REPL_PROTO_VER_THIS;
needinst_msg.is_rootprimary = !(jnlpool->jnlpool_ctl->upd_disabled);
needinst_msg.is_supplementary = jnlpool->repl_inst_filehdr->is_supplementary;
needinst_msg.jnl_ver = JNL_VER_THIS;
gtmsource_repl_send((repl_msg_ptr_t)&needinst_msg, "REPL_NEED_INSTINFO", MAX_SEQNO, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/*************** Receive REPL_INSTINFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&instinfo_msg, SIZEOF(repl_instinfo_msg_t),
REPL_INSTINFO, "REPL_INSTINFO"))
return FALSE; /* recv did not succeed */
assert(REPL_INSTINFO == instinfo_msg.type);
repl_log(gtmsource_log_fp, TRUE, FALSE, "Received secondary instance name is [%s]\n", instinfo_msg.instname);
if (!remote_side->is_supplementary)
{ /* Remote side is non-supplementary */
if (jnlpool->repl_inst_filehdr->is_supplementary)
{ /* Issue SECNOTSUPPLEMENTARY error because this is a supplementary primary and secondary
* is not a supplementary instance.
*/
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(6) ERR_SECNOTSUPPLEMENTARY, 4,
LEN_AND_STR(jnlpool->repl_inst_filehdr->inst_info.this_instname),
LEN_AND_STR(instinfo_msg.instname));
}
} else if (!jnlpool->repl_inst_filehdr->is_supplementary)
{ /* Remote side is supplementary and Local side is non-supplementary.
* The REPL_INSTINFO message would have a non-zero "strm_jnl_seqno" field.
* Pass it back on to the caller.
*/
assert(instinfo_msg.strm_jnl_seqno);
assert(0 == GET_STRM_INDEX(instinfo_msg.strm_jnl_seqno));
repl_log(gtmsource_log_fp, TRUE, TRUE, "Received Stream Seqno = %llu [0x%llx]\n",
instinfo_msg.strm_jnl_seqno, instinfo_msg.strm_jnl_seqno);
*strm_jnl_seqno = instinfo_msg.strm_jnl_seqno;
}
/* Check if instance name in the REPL_INSTINFO message matches that in the source server command line */
if (STRCMP(instinfo_msg.instname, jnlpool->gtmsource_local->secondary_instname))
{ /* Instance name obtained from the receiver does not match what was specified in the
* source server command line. Issue error.
*/
sgtm_putmsg(print_msg, PROC_OPS_PRINT_MSG_LEN, VARLSTCNT(6) ERR_REPLINSTSECMTCH, 4,
LEN_AND_STR(instinfo_msg.instname), LEN_AND_STR(jnlpool->gtmsource_local->secondary_instname));
repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg);
status = gtmsource_shutdown(TRUE, NORMAL_SHUTDOWN) - NORMAL_SHUTDOWN;
gtmsource_exit(status);
}
*secondary_was_rootprimary = (boolean_t)instinfo_msg.was_rootprimary;
}
return TRUE;
}
/* Given an input "seqno", this function locates the histinfo record from the receiver that corresponds to "seqno - 1"
*
* seqno --> The journal seqno that is to be searched in the instance file history.
* histinfo --> Pointer to the histinfo structure that is filled in with what was received.
*/
boolean_t gtmsource_get_remote_histinfo(seq_num seqno, repl_histinfo *histinfo)
{
char err_string[1024];
repl_histinfo1_msg_t histinfo1_msg;
repl_histinfo2_msg_t histinfo2_msg;
repl_histinfo_msg_t histinfo_msg;
repl_needhistinfo_msg_t needhistinfo_msg;
/*************** Send REPL_NEED_HISTINFO (formerly REPL_NEED_TRIPLE_INFO) message ***************/
memset(&needhistinfo_msg, 0, SIZEOF(needhistinfo_msg));
assert(SIZEOF(needhistinfo_msg) == MIN_REPL_MSGLEN);
needhistinfo_msg.type = REPL_NEED_HISTINFO;
needhistinfo_msg.len = MIN_REPL_MSGLEN;
needhistinfo_msg.seqno = seqno;
needhistinfo_msg.strm_num = strm_index;
needhistinfo_msg.histinfo_num = INVALID_HISTINFO_NUM;
gtmsource_repl_send((repl_msg_ptr_t)&needhistinfo_msg, "REPL_NEED_HISTINFO", seqno, needhistinfo_msg.strm_num);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
if (REPL_PROTO_VER_SUPPLEMENTARY > remote_side->proto_ver)
{ /* Remote side does not support supplementary protocol. Use older protocol messages to communicate. */
assert(REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver);
/*************** Receive REPL_OLD_TRIPLEINFO1 message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&histinfo1_msg, MIN_REPL_MSGLEN,
REPL_OLD_TRIPLEINFO1, "REPL_OLD_TRIPLEINFO1"))
return FALSE; /* recv did not succeed */
assert(REPL_OLD_TRIPLEINFO1 == histinfo1_msg.type);
/*************** Receive REPL_OLD_TRIPLEINFO2 message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&histinfo2_msg, MIN_REPL_MSGLEN,
REPL_OLD_TRIPLEINFO2, "REPL_OLD_TRIPLEINFO2"))
return FALSE; /* recv did not succeed */
assert(REPL_OLD_TRIPLEINFO2 == histinfo2_msg.type);
/* Check if start_seqno in HISTINFO1 and HISTINFO2 message is the same. If not something is wrong */
if (histinfo1_msg.start_seqno != histinfo2_msg.start_seqno)
{
assert(FALSE);
repl_log(gtmsource_log_fp, TRUE, FALSE, "REPL_OLD_TRIPLEINFO1 msg has start_seqno %llu [0x%llx] while "
"corresponding REPL_OLD_TRIPLEINFO2 msg has a different start_seqno %llu [0x%llx]. "
"Closing connection.\n", histinfo1_msg.start_seqno, histinfo1_msg.start_seqno,
histinfo2_msg.start_seqno, histinfo2_msg.start_seqno);
repl_close(>msource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return FALSE;
}
memset(histinfo, 0, SIZEOF(*histinfo));
memcpy(histinfo->root_primary_instname, histinfo1_msg.instname, MAX_INSTNAME_LEN - 1);
histinfo->start_seqno = histinfo1_msg.start_seqno;
histinfo->root_primary_cycle = histinfo2_msg.cycle;
histinfo->histinfo_num = histinfo2_msg.histinfo_num;
} else
{ /* Remote side does support supplementary protocol. Use newer protocol messages to communicate. */
/*************** Receive REPL_HISTINFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&histinfo_msg, SIZEOF(repl_histinfo_msg_t),
REPL_HISTINFO, "REPL_HISTINFO"))
return FALSE; /* recv did not succeed */
assert(REPL_HISTINFO == histinfo_msg.type);
*histinfo = histinfo_msg.history;
}
assert(0 <= histinfo->histinfo_num);
return TRUE;
}
/* Given an input "seqno", this function determines how many non-supplementary streams are known to the receiver server
* as of instance jnl_seqno = "seqno". It then compares if the source side list of known streams are identical. For each
* such stream, exchange and verify the stream-specific history record corresponding to "seqno" is the same.
*
* seqno --> The journal seqno that is to be searched in the instance file history.
* *rollback_first --> Set to TRUE if we find some stream history record not matching between the source & receiver side.
*/
boolean_t gtmsource_check_remote_strm_histinfo(seq_num seqno, boolean_t *rollback_first)
{
boolean_t lcl_strm_valid, remote_strm_valid;
int4 lcl_histinfo_num;
char err_string[1024];
int idx, status;
repl_histinfo local_histinfo;
repl_histinfo_msg_t histinfo_msg;
repl_needhistinfo_msg_t needhistinfo_msg;
repl_needstrminfo_msg_t needstrminfo_msg;
repl_strminfo_msg_t strminfo_msg;
assert(remote_side->is_supplementary);
assert(REPL_PROTO_VER_SUPPLEMENTARY <= remote_side->proto_ver);
assert(0 == strm_index);
assert(FALSE == *rollback_first);
/*************** Send REPL_NEED_STRMINFO message ***************/
memset(&needstrminfo_msg, 0, SIZEOF(needstrminfo_msg));
assert(SIZEOF(needstrminfo_msg) == MIN_REPL_MSGLEN);
needstrminfo_msg.type = REPL_NEED_STRMINFO;
needstrminfo_msg.len = MIN_REPL_MSGLEN;
needstrminfo_msg.seqno = seqno;
gtmsource_repl_send((repl_msg_ptr_t)&needstrminfo_msg, "REPL_NEED_STRMINFO", seqno, INVALID_SUPPL_STRM);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/*************** Receive REPL_STRMINFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&strminfo_msg, SIZEOF(repl_strminfo_msg_t), REPL_STRMINFO, "REPL_STRMINFO"))
return FALSE; /* recv did not succeed */
assert(REPL_STRMINFO == strminfo_msg.type);
/* Verify that the list of known streams is identical on both sides */
grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return FALSE; /* concurrent online rollback happened */
status = repl_inst_histinfo_find_seqno(seqno, INVALID_SUPPL_STRM, &local_histinfo);
rel_lock(jnlpool->jnlpool_dummy_reg);
assert(0 == status); /* we are guaranteed to find this since we have already verified 0th stream matches */
/* Fix last_histinfo_num[] in local side to include "local_histinfo" too (which could have strm_index > 0) */
if (0 < local_histinfo.strm_index)
{
assert(local_histinfo.last_histinfo_num[local_histinfo.strm_index] < local_histinfo.histinfo_num);
local_histinfo.last_histinfo_num[local_histinfo.strm_index] = local_histinfo.histinfo_num;
}
/* Skip 0th stream as it has already been verified */
for (idx = 1; idx < MAX_SUPPL_STRMS; idx++)
{
lcl_strm_valid = (INVALID_HISTINFO_NUM != local_histinfo.last_histinfo_num[idx]);
remote_strm_valid = (INVALID_HISTINFO_NUM != strminfo_msg.last_histinfo_num[idx]);
if (!lcl_strm_valid && remote_strm_valid)
{
assert(FALSE);
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(3) ERR_STRMNUMMISMTCH1, 1, idx);
}
else if (lcl_strm_valid && !remote_strm_valid)
{
assert(FALSE);
RTS_ERROR_CSA_ABT(NULL, VARLSTCNT(3) ERR_STRMNUMMISMTCH2, 1, idx);
}
}
/* Now that we know both sides have the exact set of known streams, verify history record for each stream matches */
/* Send REPL_NEED_HISTINFO message for each stream. Do common initialization outside loop */
memset(&needhistinfo_msg, 0, SIZEOF(needhistinfo_msg));
assert(SIZEOF(needhistinfo_msg) == MIN_REPL_MSGLEN);
needhistinfo_msg.type = REPL_NEED_HISTINFO;
needhistinfo_msg.len = MIN_REPL_MSGLEN;
needhistinfo_msg.seqno = seqno; /* this is not essential but helps debugging the handshake as this gets logged in
* the receiver log so initialize it */
for (idx = 1; idx < MAX_SUPPL_STRMS; idx++)
{
lcl_histinfo_num = local_histinfo.last_histinfo_num[idx];
if (INVALID_HISTINFO_NUM == lcl_histinfo_num)
continue;
/* Find corresponding history record on REMOTE side */
assert(INVALID_HISTINFO_NUM != strminfo_msg.last_histinfo_num[idx]);
needhistinfo_msg.histinfo_num = strminfo_msg.last_histinfo_num[idx];
needhistinfo_msg.strm_num = idx;
/*************** Send REPL_NEED_HISTINFO message ***************/
gtmsource_repl_send((repl_msg_ptr_t)&needhistinfo_msg, "REPL_NEED_HISTINFO", seqno, needhistinfo_msg.strm_num);
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return FALSE; /* send did not succeed */
/*************** Receive REPL_HISTINFO message ***************/
if (!gtmsource_repl_recv((repl_msg_ptr_t)&histinfo_msg, SIZEOF(repl_histinfo_msg_t),
REPL_HISTINFO, "REPL_HISTINFO"))
return FALSE; /* recv did not succeed */
assert(REPL_HISTINFO == histinfo_msg.type);
/* Find corresponding history record on LOCAL side */
grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return FALSE; /* concurrent online rollback happened */
status = repl_inst_histinfo_get(lcl_histinfo_num, &local_histinfo);
assert(0 == status); /* Since we pass histinfo_num of 0 which is >=0 and < num_histinfo */
rel_lock(jnlpool->jnlpool_dummy_reg);
/* Compare the two history records. If they are not identical for even one stream, signal rollback on receiver */
if (!gtmsource_is_histinfo_identical(&histinfo_msg.history, &local_histinfo, seqno, OK_TO_LOG_TRUE))
*rollback_first = TRUE;
}
return TRUE;
}
/* This function finds the 'n'th histinfo record in the replication instance file of this instance.
* This is a wrapper on top of "repl_inst_histinfo_get" which additionally does error checking.
* This closes the connection if it detects a REPLINSTNOHIST error.
*/
void gtmsource_histinfo_get(int4 index, repl_histinfo *histinfo)
{
unix_db_info *udi;
char histdetail[MAX_REPL_OPMSG_LEN];
int4 status;
repl_msg_t instnohist_msg;
assert((NULL != jnlpool) && (NULL != jnlpool->repl_inst_filehdr)); /* journal pool should be set up */
udi = FILE_INFO(jnlpool->jnlpool_dummy_reg);
assert(udi->s_addrs.now_crit);
status = repl_inst_histinfo_get(index, histinfo);
assert((0 == status) || (INVALID_HISTINFO_NUM == index));
assert((0 != status) || (index == histinfo->histinfo_num));
if (0 != status)
{
assert(ERR_REPLINSTNOHIST == status); /* currently the only error returned by "repl_inst_histinfo_get" */
SNPRINTF(histdetail, MAX_REPL_OPMSG_LEN, "record number %d [0x%x]", index, index);
gtm_putmsg_csa(CSA_ARG(NULL) VARLSTCNT(6) ERR_REPLINSTNOHIST, 4, LEN_AND_STR(histdetail), LEN_AND_STR(udi->fn));
/* Send this error status to the receiver server before closing the connection. This way the receiver
* will know to shut down rather than loop back trying to reconnect. This avoids an infinite loop of
* connection open and closes between the source server and receiver server.
*/
instnohist_msg.type = REPL_INST_NOHIST;
instnohist_msg.len = MIN_REPL_MSGLEN;
memset(&instnohist_msg.msg[0], 0, SIZEOF(instnohist_msg.msg));
gtmsource_repl_send((repl_msg_ptr_t)&instnohist_msg, "REPL_INST_NOHIST", MAX_SEQNO, INVALID_SUPPL_STRM);
/* Close the connection */
repl_log(gtmsource_log_fp, TRUE, TRUE, "Connection reset due to above REPLINSTNOHIST error\n");
repl_close(>msource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = jnlpool->gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
}
}
/* Given two histinfo records (one each from the primary and secondary), this function compares if the history information
* in both histinfo records are the same. If so, it returns TRUE; otherwise it returns FALSE. The jnl_seqno is only used for
* logging and should refer to the resync seqno we are checking. Logging is only performed if ok_to_log is TRUE.
*/
boolean_t gtmsource_is_histinfo_identical(repl_histinfo *remote_histinfo, repl_histinfo *local_histinfo,
seq_num jnl_seqno, boolean_t ok_to_log)
{
if (ok_to_log)
{
repl_log(gtmsource_log_fp, FALSE, FALSE, "On Secondary, seqno %llu [0x%llx] generated by instance name [%s] "
"with cycle [0x%x] (%d), pid = %d : time = %d [0x%x]\n", jnl_seqno - 1, jnl_seqno - 1,
remote_histinfo->root_primary_instname, remote_histinfo->root_primary_cycle,
remote_histinfo->root_primary_cycle, remote_histinfo->creator_pid,
remote_histinfo->created_time, remote_histinfo->created_time);
repl_log(gtmsource_log_fp, FALSE, FALSE, "On Primary, seqno %llu [0x%llx] generated by instance name [%s] "
"with cycle [0x%x] (%d), pid = %d : time = %d [0x%x]\n", jnl_seqno - 1, jnl_seqno - 1,
local_histinfo->root_primary_instname, local_histinfo->root_primary_cycle,
local_histinfo->root_primary_cycle, local_histinfo->creator_pid,
local_histinfo->created_time, local_histinfo->created_time);
}
/* Starting with the version of GT.M that supports supplementary instances, we check for a lot more pieces of the history.
* But if remote side is an older version that does not support the supplementary protocol, check only those pieces
* which were checked previously.
*/
if (STRCMP(local_histinfo->root_primary_instname, remote_histinfo->root_primary_instname)
|| (local_histinfo->root_primary_cycle != remote_histinfo->root_primary_cycle)
|| ((REPL_PROTO_VER_SUPPLEMENTARY <= remote_side->proto_ver)
&& ((local_histinfo->creator_pid != remote_histinfo->creator_pid)
|| (local_histinfo->created_time != remote_histinfo->created_time))))
{ /* either the root primary instance name or the cycle did not match */
if (ok_to_log)
{
repl_log(gtmsource_log_fp, FALSE, FALSE, "Primary and Secondary have DIFFERING history records for "
"seqno %llu [0x%llx]\n", jnl_seqno - 1, jnl_seqno - 1);
SNPRINTF(print_msg_t, SIZEOF(print_msg_t), "Originating instance and replicating instances are"
" out-of-sync after sequence number : "INT8_FMT" ", jnl_seqno - 1);
sgtm_putmsg(print_msg_src, PROC_OPS_PRINT_MSG_LEN, VARLSTCNT(4) ERR_TEXT, 2, LEN_AND_STR(print_msg_t));
repl_log(gtmsource_log_fp, TRUE, TRUE, print_msg_src);
}
return FALSE;
} else
{
if (ok_to_log)
repl_log(gtmsource_log_fp, FALSE, FALSE, "Primary and Secondary have IDENTICAL history records for "
"seqno %llu [0x%llx]\n", jnl_seqno - 1, jnl_seqno - 1);
return TRUE;
}
}
/* Determine the resync seqno between primary and secondary by comparing local and remote histinfo records from the tail of the
* respective instance files until we reach a seqno whose histinfo record information is identical in both. The resync seqno
* is the first seqno whose histinfo record information was NOT identical in both. The histinfo records on the secondary are
* obtained through successive calls to the function "gtmsource_get_remote_histinfo".
*
* If the connection gets reset while exchanging histinfo records with secondary, this function returns a seqno of MAX_SEQNO.
* The global variable "gtmsource_state" will be set to GTMSOURCE_CHANGING_MODE or GTMSOURCE_WAITING_FOR_CONNECTION and the
* caller of this function should accordingly check for that immediately on return.
*/
seq_num gtmsource_find_resync_seqno(repl_histinfo *local_histinfo, repl_histinfo *remote_histinfo)
{
seq_num max_start_seqno, local_start_seqno, remote_start_seqno;
int4 local_histinfo_num;
# ifdef DEBUG
int4 prev_remote_histinfo_num;
sgmnt_addrs *csa;
seq_num min_start_seqno;
# endif
assert((NULL != jnlpool) && (NULL != jnlpool->jnlpool_dummy_reg) && jnlpool->jnlpool_dummy_reg->open);
# ifdef DEBUG
csa = &FILE_INFO(jnlpool->jnlpool_dummy_reg)->s_addrs;
ASSERT_VALID_JNLPOOL(csa);
prev_remote_histinfo_num = remote_histinfo->prev_histinfo_num;
# endif
do
{
local_start_seqno = local_histinfo->start_seqno;
remote_start_seqno = remote_histinfo->start_seqno;
assert(local_start_seqno);
assert(remote_start_seqno);
max_start_seqno = MAX(local_start_seqno, remote_start_seqno);
/* "max_start_seqno" is the largest yet known seqno whose histinfo does NOT match between primary and secondary.
* Therefore determine the histinfo(s) for "max_start_seqno-1" in primary and/or secondary and compare them.
*/
if (1 == max_start_seqno)
{ /* The earliest possible seqno in the primary is out of sync with that of the secondary. Stop the histinfo
* search right away and return with 1 (the smallest possible seqno) as the resync seqno.
*/
assert(local_start_seqno == max_start_seqno);
assert(remote_start_seqno == max_start_seqno);
assert(INVALID_HISTINFO_NUM == local_histinfo->prev_histinfo_num);
assert(INVALID_HISTINFO_NUM == remote_histinfo->prev_histinfo_num);
break;
}
DEBUG_ONLY(min_start_seqno = MIN(local_start_seqno, remote_start_seqno);)
assert(!gtmsource_is_histinfo_identical(remote_histinfo, local_histinfo, min_start_seqno, OK_TO_LOG_FALSE));
if (local_start_seqno == max_start_seqno)
{ /* Need to get the previous histinfo record on the primary */
local_histinfo_num = local_histinfo->prev_histinfo_num;
assert(0 <= local_histinfo->histinfo_num);
grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return MAX_SEQNO;
gtmsource_histinfo_get(local_histinfo_num, local_histinfo);
rel_lock(jnlpool->jnlpool_dummy_reg);
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
return MAX_SEQNO; /* Connection got reset in "gtmsource_histinfo_get" due to REPLINSTNOHIST */
}
if (remote_start_seqno == max_start_seqno)
{ /* Need to get the previous histinfo record on the secondary */
assert(0 <= prev_remote_histinfo_num);
if (!gtmsource_get_remote_histinfo(remote_start_seqno, remote_histinfo))
{
assert((GTMSOURCE_CHANGING_MODE == gtmsource_state)
|| (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state));
/* Connection got reset while exchanging histinfo history information */
return MAX_SEQNO;
}
assert(remote_histinfo->histinfo_num == prev_remote_histinfo_num);
DEBUG_ONLY(prev_remote_histinfo_num = remote_histinfo->prev_histinfo_num);
}
assert(local_histinfo->start_seqno < max_start_seqno);
assert(remote_histinfo->start_seqno < max_start_seqno);
} while (!gtmsource_is_histinfo_identical(remote_histinfo, local_histinfo, max_start_seqno, OK_TO_LOG_TRUE));
repl_log(gtmsource_log_fp, TRUE, FALSE, "Resync Seqno determined is %llu [0x%llx]\n", max_start_seqno, max_start_seqno);
/* "max_start_seqno-1" has same histinfo info in both primary and secondary. Hence "max_start_seqno" is the resync seqno. */
return max_start_seqno;
}
/* This routine sends a REPL_HISTREC message for the histinfo record corresponding to seqno "gtmsource_local->read_jnl_seqno".
* It positions the send to that histinfo record which corresponds to "gtmsource_local->read_jnl_seqno". This is done by
* a call to the function "gtmsource_set_next_histinfo_seqno".
* On return from this routine, the caller should check the value of the global variable "gtmsource_state" to see if it is
* either of GTMSOURCE_CHANGING_MODE or GTMSOURCE_WAITING_FOR_CONNECTION and if so take appropriate action.
*/
void gtmsource_send_new_histrec()
{
repl_histinfo histinfo, zero_histinfo;
repl_old_triple_msg_t oldtriple_msg;
gtmsource_local_ptr_t gtmsource_local;
boolean_t first_histrec_send;
int4 zero_histinfo_num;
DEBUG_ONLY(sgmnt_addrs *csa;)
assert((NULL != jnlpool) && (NULL != jnlpool->jnlpool_dummy_reg) && jnlpool->jnlpool_dummy_reg->open);
# ifdef DEBUG
csa = &FILE_INFO(jnlpool->jnlpool_dummy_reg)->s_addrs;
ASSERT_VALID_JNLPOOL(csa);
# endif
gtmsource_local = jnlpool->gtmsource_local;
assert(gtmsource_local->send_new_histrec);
assert(gtmsource_local->read_jnl_seqno <= gtmsource_local->next_histinfo_seqno);
first_histrec_send = (-1 == gtmsource_local->next_histinfo_num);
gtmsource_set_next_histinfo_seqno(FALSE); /* sets gtmsource_local->next_histinfo_seqno & next_histinfo_num */
if ((GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state) || (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state))
return; /* "gtmsource_set_next_histinfo_seqno" encountered REPLINSTNOHIST or concurrent online rollback occurred */
/*************** Read histinfo (to send) from instance file first ***************/
grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return;
assert(1 <= gtmsource_local->next_histinfo_num);
/* With non-supplementary instances, we are guaranteed, consecutive history records increase in start_seqno.
* But with supplementary instances, it is possible for two consecutive history records to have the same start_seqno
* (if those came from different root primary instances at the same time). Take care of that in the assert below.
*/
assert((!this_side->is_supplementary && (gtmsource_local->read_jnl_seqno < gtmsource_local->next_histinfo_seqno))
|| (this_side->is_supplementary && (gtmsource_local->read_jnl_seqno <= gtmsource_local->next_histinfo_seqno)));
gtmsource_histinfo_get(gtmsource_local->next_histinfo_num - 1, &histinfo);
if ((GTMSOURCE_WAITING_FOR_CONNECTION != gtmsource_state) && this_side->is_supplementary && first_histrec_send
&& (0 < histinfo.strm_index))
{ /* Supplementary source side sending to a supplementary receiver. And this is the FIRST history record
* being sent for this replication connection. It is possible that the closest history record prior to
* "gtmsource_local->read_jnl_seqno" has a non-zero "strm_index". In that case, we might be sending a
* non-zero strm_index history record across and it is possible that the secondary has an empty instance
* file (in case of an -updateresync startup). This would lead to issues on the receiving supplementary
* instance since it will now have a non-zero stream history record as the first history record in its
* instance file. This is an out-of-design situation since to establish the resync point between two
* supplementary instances, we need to examine the 0th stream and the assumption is that if the instance
* file has at least one history record, there is at least one 0th stream history record. Therefore we
* need to prevent this out-of-design situation. Towards that, find out the most recent 0th stream
* history record in this instance and send that across BEFORE sending the history record corresponding
* to "gtmsource_local->read_jnl_seqno".
*/
zero_histinfo_num = histinfo.last_histinfo_num[0];
assert(INVALID_HISTINFO_NUM != zero_histinfo_num);
assert(0 <= zero_histinfo_num);
gtmsource_histinfo_get(zero_histinfo_num, &zero_histinfo);
} else
zero_histinfo_num = INVALID_HISTINFO_NUM;
rel_lock(jnlpool->jnlpool_dummy_reg);
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
{
assert(FALSE);
return; /* Connection got reset in "gtmsource_histinfo_get" due to REPLINSTNOHIST */
}
assert(gtmsource_local->read_jnl_seqno >= histinfo.start_seqno);
assert(remote_side->endianness_known); /* only then is remote_side->cross_endian reliable */
if (REPL_PROTO_VER_SUPPLEMENTARY > remote_side->proto_ver)
{ /* Remote side does not support supplementary protocol. Use older protocol messages to communicate. */
assert(REPL_PROTO_VER_MULTISITE <= remote_side->proto_ver);
assert(!this_side->is_supplementary);
/*************** Send REPL_OLD_TRIPLE message ***************/
memset(&oldtriple_msg, 0, SIZEOF(oldtriple_msg));
oldtriple_msg.type = REPL_OLD_TRIPLE;
oldtriple_msg.len = SIZEOF(oldtriple_msg);
oldtriple_msg.triplecontent.jrec_type = JRT_TRIPLE;
if (remote_side->cross_endian && (this_side->jnl_ver < remote_side->jnl_ver))
{
oldtriple_msg.triplecontent.forwptr = GTM_BYTESWAP_24(SIZEOF(repl_old_triple_jnl_t));
oldtriple_msg.triplecontent.start_seqno = GTM_BYTESWAP_64(gtmsource_local->read_jnl_seqno);
oldtriple_msg.triplecontent.cycle = GTM_BYTESWAP_32(histinfo.root_primary_cycle);
} else
{
oldtriple_msg.triplecontent.forwptr = SIZEOF(repl_old_triple_jnl_t);
oldtriple_msg.triplecontent.start_seqno = gtmsource_local->read_jnl_seqno;
oldtriple_msg.triplecontent.cycle = histinfo.root_primary_cycle;
}
memcpy(oldtriple_msg.triplecontent.instname, histinfo.root_primary_instname, MAX_INSTNAME_LEN - 1);
gtmsource_repl_send((repl_msg_ptr_t)&oldtriple_msg, "REPL_OLD_TRIPLE",
gtmsource_local->read_jnl_seqno, INVALID_SUPPL_STRM);
} else
{ /* Remote side supports supplementary protocol. Communicate using new message protocol */
if (INVALID_HISTINFO_NUM != zero_histinfo_num)
{ /* Send REPL_HISTREC message corresponding to the 0th stream history record */
assert(gtmsource_local->read_jnl_seqno >= zero_histinfo.start_seqno);
GTMSOURCE_SEND_REPL_HISTREC(zero_histinfo, gtmsource_local, remote_side->cross_endian);
/* the above macro would have invoked "gtmsource_repl_send" so check for "gtmsource_state" */
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return; /* send did not succeed */
/* If we have a non-zero stream history record, then check if its "start_seqno" is lesser than
* gtmsource_local->read_jnl_seqno. If so, we don't even need to send this history record. If it is
* equal though, we do need to send this across.
*/
assert(gtmsource_local->read_jnl_seqno >= histinfo.start_seqno);
if (gtmsource_local->read_jnl_seqno == histinfo.start_seqno)
GTMSOURCE_SEND_REPL_HISTREC(histinfo, gtmsource_local, remote_side->cross_endian);
} else
GTMSOURCE_SEND_REPL_HISTREC(histinfo, gtmsource_local, remote_side->cross_endian);
}
if ((GTMSOURCE_CHANGING_MODE == gtmsource_state) || (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state))
return; /* send did not succeed */
repl_dump_histinfo(gtmsource_log_fp, TRUE, TRUE, "New History Content", &histinfo);
gtmsource_local->send_new_histrec = FALSE;
}
/* This function is invoked once for each histinfo record that the source server goes past while sending journal records across.
* This function sets the boundary seqno field "next_histinfo_seqno" to be the "start_seqno" of the next histinfo record so the
* source server does not send any seqnos corresponding to the next histinfo record before sending a REPL_HISTREC message.
* It will set "gtmsource_local->next_histinfo_seqno" and "gtmsource_local->next_histinfo_num" to correspond to the next histinfo
* record and set the private copy "gtmsource_local->num_histinfo" to a copy of what is currently present in
* "repl_inst_filehdr->num_histinfo".
*
* The input variable "detect_new_histinfo" is set to TRUE if this function is called from "gtmsource_readfiles" or
* "gtmsource_readpool" the moment they detect that the instance file has had a new histinfo record since the last time this
* source server took a copy of it in its private "gtmsource_local->num_histinfo". In this case, the only objective
* is to find the start_seqno of the next histinfo record and note that down as "gtmsource_local->next_histinfo_seqno".
*
* If the input variable "detect_new_histinfo" is set to FALSE, "next_histinfo_seqno" is set to the starting seqno of the
* histinfo record immediately after that corresponding to "gtmsource_local->read_jnl_seqno".
*
* This can end up closing the connection if the call to "gtmsource_histinfo_get" or "repl_inst_histinfo_find_seqno" fails.
*/
void gtmsource_set_next_histinfo_seqno(boolean_t detect_new_histinfo)
{
unix_db_info *udi;
int4 status, next_histinfo_num, num_histinfo;
repl_histinfo next_histinfo, prev_histinfo;
gtmsource_local_ptr_t gtmsource_local;
repl_msg_t instnohist_msg;
char histdetail[256];
seq_num read_seqno;
DEBUG_ONLY(sgmnt_addrs *csa;)
assert((NULL != jnlpool) && (NULL != jnlpool->jnlpool_dummy_reg) && jnlpool->jnlpool_dummy_reg->open);
# ifdef DEBUG
csa = &FILE_INFO(jnlpool->jnlpool_dummy_reg)->s_addrs;
ASSERT_VALID_JNLPOOL(csa);
# endif
grab_lock(jnlpool->jnlpool_dummy_reg, TRUE, HANDLE_CONCUR_ONLINE_ROLLBACK);
if (GTMSOURCE_HANDLE_ONLN_RLBK == gtmsource_state)
return;
assert(NULL != jnlpool->repl_inst_filehdr); /* journal pool should be set up */
gtmsource_local = jnlpool->gtmsource_local;
next_histinfo_num = gtmsource_local->next_histinfo_num;
/* assert((-1 == next_histinfo_num) || (gtmsource_local->next_histinfo_seqno >= gtmsource_local->read_jnl_seqno)); */
read_seqno = gtmsource_local->read_jnl_seqno;
assert(gtmsource_local->next_histinfo_seqno >= read_seqno);
num_histinfo = jnlpool->repl_inst_filehdr->num_histinfo;
if (!detect_new_histinfo)
{
if (-1 == next_histinfo_num)
{ /* This is the first invocation of this function after this connection with a receiver was established.
* Find the first histinfo record in the local instance file corresponding to the next seqno to be sent
* across i.e. "gtmsource_local->read_jnl_seqno". The below function will return the history record
* just BEFORE read_jnl_seqno. So fetch the immediately next history record to get the desired record.
*/
assert(read_seqno <= jnlpool->jnlpool_ctl->jnl_seqno);
status = repl_inst_histinfo_find_seqno(read_seqno, INVALID_SUPPL_STRM, &prev_histinfo);
if (0 != status)
{
assert(ERR_REPLINSTNOHIST == status); /* only error returned by "repl_inst_histinfo_find_seqno" */
assert((INVALID_HISTINFO_NUM == prev_histinfo.histinfo_num)
|| (prev_histinfo.start_seqno >= read_seqno));
if ((INVALID_HISTINFO_NUM == prev_histinfo.histinfo_num)
|| (prev_histinfo.start_seqno > read_seqno))
{ /* The read seqno is PRIOR to the starting seqno of the instance file.
* In that case, issue error and close the connection.
*/
NON_GTM64_ONLY(SNPRINTF(histdetail, MAX_REPL_OPMSG_LEN, "seqno [0x%llx]", read_seqno - 1));
GTM64_ONLY(SNPRINTF(histdetail, MAX_REPL_OPMSG_LEN, "seqno [0x%lx]", read_seqno - 1));
udi = FILE_INFO(jnlpool->jnlpool_dummy_reg);
gtm_putmsg_csa(CSA_ARG(NULL) VARLSTCNT(6) ERR_REPLINSTNOHIST, 4,
LEN_AND_STR(histdetail), LEN_AND_STR(udi->fn));
/* Send error status to the receiver server before closing the connection. This way the
* receiver will know to shut down rather than loop back trying to reconnect. This avoids
* an infinite loop of connection open/closes between the source and receiver servers.
*/
instnohist_msg.type = REPL_INST_NOHIST;
instnohist_msg.len = MIN_REPL_MSGLEN;
memset(&instnohist_msg.msg[0], 0, SIZEOF(instnohist_msg.msg));
gtmsource_repl_send((repl_msg_ptr_t)&instnohist_msg, "REPL_INST_NOHIST",
MAX_SEQNO, INVALID_SUPPL_STRM);
rel_lock(jnlpool->jnlpool_dummy_reg);
repl_log(gtmsource_log_fp, TRUE, TRUE,
"Connection reset due to above REPLINSTNOHIST error\n");
repl_close(>msource_sock_fd);
SHORT_SLEEP(GTMSOURCE_WAIT_FOR_RECEIVER_CLOSE_CONN);
gtmsource_state = gtmsource_local->gtmsource_state = GTMSOURCE_WAITING_FOR_CONNECTION;
return;
}
next_histinfo_num = prev_histinfo.histinfo_num;
} else
{
assert(prev_histinfo.start_seqno < read_seqno);
next_histinfo_num = prev_histinfo.histinfo_num;
if ((next_histinfo_num + 1) < num_histinfo)
{
gtmsource_histinfo_get(next_histinfo_num + 1, &next_histinfo);
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
{
assert(FALSE);
rel_lock(jnlpool->jnlpool_dummy_reg);
return; /* Connection got reset in "gtmsource_histinfo_get" due to REPLINSTNOHIST */
}
assert(next_histinfo.start_seqno >= read_seqno);
if (next_histinfo.start_seqno == read_seqno)
{
next_histinfo_num++;
assert(next_histinfo_num == next_histinfo.histinfo_num);
}
}
}
}
/* else: next_histinfo_num was already found. So just move on to the NEXT history record. */
assert(0 <= next_histinfo_num);
assert(next_histinfo_num < num_histinfo);
assert((gtmsource_local->next_histinfo_num != next_histinfo_num)
|| (read_seqno == gtmsource_local->next_histinfo_seqno)
|| (MAX_SEQNO == gtmsource_local->next_histinfo_seqno));
next_histinfo_num++;
} else
{ /* A new histinfo record got added to the instance file since we knew last.
* Set "next_histinfo_seqno" for our current histinfo down from its current value of MAX_SEQNO.
*/
assert(gtmsource_local->next_histinfo_seqno == MAX_SEQNO);
assert(next_histinfo_num < num_histinfo);
if (READ_FILE == gtmsource_local->read_state)
{ /* It is possible that we have already read the journal records for the next
* read_jnl_seqno before detecting that a histinfo has to be sent first. In that case,
* the journal files may have been positioned ahead of the read_jnl_seqno for the
* next read. Indicate that they have to be repositioned into the past.
*/
gtmsource_set_lookback();
}
}
if (num_histinfo > next_histinfo_num)
{ /* Read the next histinfo record to determine its "start_seqno" */
gtmsource_histinfo_get(next_histinfo_num, &next_histinfo);
if (GTMSOURCE_WAITING_FOR_CONNECTION == gtmsource_state)
{
assert(FALSE);
rel_lock(jnlpool->jnlpool_dummy_reg);
return; /* Connection got reset in "gtmsource_histinfo_get" due to REPLINSTNOHIST */
}
assert(next_histinfo.start_seqno >= read_seqno);
gtmsource_local->next_histinfo_seqno = next_histinfo.start_seqno;
} else
gtmsource_local->next_histinfo_seqno = MAX_SEQNO;
gtmsource_local->next_histinfo_num = next_histinfo_num;
gtmsource_local->num_histinfo = num_histinfo;
rel_lock(jnlpool->jnlpool_dummy_reg);
}
|