1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331
|
/* Asynchronous replication implementation.
*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include "cluster.h"
#include "bio.h"
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/stat.h>
void replicationDiscardCachedMaster(void);
void replicationResurrectCachedMaster(connection *conn);
void replicationSendAck(void);
void putSlaveOnline(client *slave);
int cancelReplicationHandshake(void);
/* We take a global flag to remember if this instance generated an RDB
* because of replication, so that we can remove the RDB file in case
* the instance is configured to have no persistence. */
int RDBGeneratedByReplication = 0;
/* --------------------------- Utility functions ---------------------------- */
/* Return the pointer to a string representing the slave ip:listening_port
* pair. Mostly useful for logging, since we want to log a slave using its
* IP address and its listening port which is more clear for the user, for
* example: "Closing connection with replica 10.1.2.3:6380". */
char *replicationGetSlaveName(client *c) {
static char buf[NET_PEER_ID_LEN];
char ip[NET_IP_STR_LEN];
ip[0] = '\0';
buf[0] = '\0';
if (c->slave_ip[0] != '\0' ||
connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1)
{
/* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
if (c->slave_listening_port)
anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
else
snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",ip);
} else {
snprintf(buf,sizeof(buf),"client id #%llu",
(unsigned long long) c->id);
}
return buf;
}
/* Plain unlink() can block for quite some time in order to actually apply
* the file deletion to the filesystem. This call removes the file in a
* background thread instead. We actually just do close() in the thread,
* by using the fact that if there is another instance of the same file open,
* the foreground unlink() will only remove the fs name, and deleting the
* file's storage space will only happen once the last reference is lost. */
int bg_unlink(const char *filename) {
int fd = open(filename,O_RDONLY|O_NONBLOCK);
if (fd == -1) {
/* Can't open the file? Fall back to unlinking in the main thread. */
return unlink(filename);
} else {
/* The following unlink() removes the name but doesn't free the
* file contents because a process still has it open. */
int retval = unlink(filename);
if (retval == -1) {
/* If we got an unlink error, we just return it, closing the
* new reference we have to the file. */
int old_errno = errno;
close(fd); /* This would overwrite our errno. So we saved it. */
errno = old_errno;
return -1;
}
bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)fd,NULL,NULL);
return 0; /* Success. */
}
}
/* ---------------------------------- MASTER -------------------------------- */
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
server.repl_backlog_off = server.master_repl_offset+1;
}
/* This function is called when the user modifies the replication backlog
* size at runtime. It is up to the function to both update the
* server.repl_backlog_size and to resize the buffer and setup it so that
* it contains the same data as the previous one (possibly less data, but
* the most recent bytes, or the same data and more free space in case the
* buffer is enlarged). */
void resizeReplicationBacklog(long long newsize) {
if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
if (server.repl_backlog_size == newsize) return;
server.repl_backlog_size = newsize;
if (server.repl_backlog != NULL) {
/* What we actually do is to flush the old buffer and realloc a new
* empty one. It will refill with new data incrementally.
* The reason is that copying a few gigabytes adds latency and even
* worse often we need to alloc additional space before freeing the
* old buffer. */
zfree(server.repl_backlog);
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
/* Next byte we have is... the next since the buffer is empty. */
server.repl_backlog_off = server.master_repl_offset+1;
}
}
void freeReplicationBacklog(void) {
serverAssert(listLength(server.slaves) == 0);
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}
/* Add data to the replication backlog.
* This function also increments the global replication offset stored at
* server.master_repl_offset, because there is no case where we want to feed
* the backlog without incrementing the offset. */
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
* as input. */
void feedReplicationBacklogWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;
if (o->encoding == OBJ_ENCODING_INT) {
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
p = llstr;
} else {
len = sdslen(o->ptr);
p = o->ptr;
}
feedReplicationBacklog(p,len);
}
/* Propagate write commands to slaves, and populate the replication backlog
* as well. This function is used if the instance is a master: we use
* the commands received by our clients in order to create the replication
* stream. Instead if the instance is a slave and has sub-slaves attached,
* we use replicationFeedSlavesFromMasterStream() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
/* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to
* propagate *identical* replication stream. In this way this slave can
* advertise the same replication ID as the master (since it shares the
* master replication history and has the same backlog and offsets). */
if (server.masterhost != NULL) return;
/* If there aren't slaves, and there is no backlog buffer to populate,
* we can return ASAP. */
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
/* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
robj *selectcmd;
/* For a few DBs we have pre-computed SELECT command. */
if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
/* Add the SELECT command into the backlog. */
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
/* Send it to slaves. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReply(slave,selectcmd);
}
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyArrayLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
/* This is a debugging function that gets called when we detect something
* wrong with the replication protocol: the goal is to peek into the
* replication backlog and show a few final bytes to make simpler to
* guess what kind of bug it could be. */
void showLatestBacklog(void) {
if (server.repl_backlog == NULL) return;
long long dumplen = 256;
if (server.repl_backlog_histlen < dumplen)
dumplen = server.repl_backlog_histlen;
/* Identify the first byte to dump. */
long long idx =
(server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) %
server.repl_backlog_size;
/* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
while(dumplen) {
long long thislen =
((server.repl_backlog_size - idx) < dumplen) ?
(server.repl_backlog_size - idx) : dumplen;
dump = sdscatrepr(dump,server.repl_backlog+idx,thislen);
dumplen -= thislen;
idx = 0;
}
/* Finally log such bytes: this is vital debugging info to
* understand what happened. */
serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
sdsfree(dump);
}
/* This function is used in order to proxy what we receive from our master
* to our sub-slaves. */
#include <ctype.h>
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
listNode *ln;
listIter li;
/* Debugging: this is handy to see the stream sent from master
* to slaves. Disabled with if(0). */
if (0) {
printf("%zu:",buflen);
for (size_t j = 0; j < buflen; j++) {
printf("%c", isprint(buf[j]) ? buf[j] : '.');
}
printf("\n");
}
if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReplyProto(slave,buf,buflen);
}
}
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
struct timeval tv;
gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
if (c->flags & CLIENT_LUA) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & CLIENT_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
} else {
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
}
for (j = 0; j < argc; j++) {
if (argv[j]->encoding == OBJ_ENCODING_INT) {
cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
} else {
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
sdslen(argv[j]->ptr));
}
if (j != argc-1)
cmdrepr = sdscatlen(cmdrepr," ",1);
}
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
cmdobj = createObject(OBJ_STRING,cmdrepr);
listRewind(monitors,&li);
while((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor,cmdobj);
}
decrRefCount(cmdobj);
}
/* Feed the slave 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */
long long addReplyReplicationBacklog(client *c, long long offset) {
long long j, skip, len;
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
if (server.repl_backlog_histlen == 0) {
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
return 0;
}
serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
server.repl_backlog_size);
serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
server.repl_backlog_off);
serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
server.repl_backlog_histlen);
serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
server.repl_backlog_idx);
/* Compute the amount of bytes we need to discard. */
skip = offset - server.repl_backlog_off;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
/* Point j to the oldest byte, that is actually our
* server.repl_backlog_off byte. */
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
server.repl_backlog_size;
serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
/* Discard the amount of data to seek to the specified 'offset'. */
j = (j + skip) % server.repl_backlog_size;
/* Feed slave with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross-boundary. */
len = server.repl_backlog_histlen - skip;
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
while(len) {
long long thislen =
((server.repl_backlog_size - j) < len) ?
(server.repl_backlog_size - j) : len;
serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
len -= thislen;
j = 0;
}
return server.repl_backlog_histlen - skip;
}
/* Return the offset to provide as reply to the PSYNC command received
* from the slave. The returned value is only valid immediately after
* the BGSAVE process started and before executing any other command
* from clients. */
long long getPsyncInitialOffset(void) {
return server.master_repl_offset;
}
/* Send a FULLRESYNC reply in the specific case of a full resynchronization,
* as a side effect setup the slave for a full sync in different ways:
*
* 1) Remember, into the slave client structure, the replication offset
* we sent here, so that if new slaves will later attach to the same
* background RDB saving process (by duplicating this client output
* buffer), we can get the right offset from this slave.
* 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
* we start accumulating differences from this point.
* 3) Force the replication stream to re-emit a SELECT statement so
* the new slave incremental differences will start selecting the
* right database number.
*
* Normally this function should be called immediately after a successful
* BGSAVE for replication was started, or when there is one already in
* progress that we attached our slave to. */
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
slave->psync_initial_offset = offset;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
/* We are going to accumulate the incremental changes for this
* slave as well. Set slaveseldb to -1 in order to force to re-emit
* a SELECT statement in the replication stream. */
server.slaveseldb = -1;
/* Don't send this reply to slaves that approached us with
* the old SYNC command. */
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.replid,offset);
if (connWrite(slave->conn,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
* On success return C_OK, otherwise C_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
char *master_replid = c->argv[1]->ptr;
char buf[128];
int buflen;
/* Parse the replication offset asked by the slave. Go to full sync
* on parse error: this should never happen but we try to handle
* it in a robust way compared to aborting. */
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
C_OK) goto need_full_resync;
/* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue.
*
* Note that there are two potentially valid replication IDs: the ID1
* and the ID2. The ID2 however is only valid up to a specific offset. */
if (strcasecmp(master_replid, server.replid) &&
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
/* Replid "?" is used by slaves that want to force a full resync. */
if (master_replid[0] != '?') {
if (strcasecmp(master_replid, server.replid) &&
strcasecmp(master_replid, server.replid2))
{
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Replication ID mismatch (Replica asked for '%s', my "
"replication IDs are '%s' and '%s')",
master_replid, server.replid, server.replid2);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Requested offset for second ID was %lld, but I can reply "
"up to %lld", psync_offset, server.second_replid_offset);
}
} else {
serverLog(LL_NOTICE,"Full resync requested by replica %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
}
/* We still have the data our slave is asking for? */
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
goto need_full_resync;
}
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(server.slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* empty so this write will never fail actually. */
if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
} else {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
}
if (connWrite(c->conn,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
}
psync_len = addReplyReplicationBacklog(c,psync_offset);
serverLog(LL_NOTICE,
"Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
replicationGetSlaveName(c),
psync_len, psync_offset);
/* Note that we don't need to set the selected DB at server.slaveseldb
* to -1 to force the master to emit SELECT, since the slave already
* has this state from the previous connection with the master. */
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
/* We need a full resync for some reason... Note that we can't
* reply to PSYNC right now if a full SYNC is needed. The reply
* must include the master offset at the time the RDB file we transfer
* is generated, so we need to delay the reply to that moment. */
return C_ERR;
}
/* Start a BGSAVE for replication goals, which is, selecting the disk or
* socket target depending on the configuration, and making sure that
* the script cache is flushed before to start.
*
* The mincapa argument is the bitwise AND among all the slaves capabilities
* of the slaves waiting for this BGSAVE, so represents the slave capabilities
* all the slaves support. Can be tested via SLAVE_CAPA_* macros.
*
* Side effects, other than starting a BGSAVE:
*
* 1) Handle the slaves in WAIT_START state, by preparing them for a full
* sync if the BGSAVE was successfully started, or sending them an error
* and dropping them from the list of slaves.
*
* 2) Flush the Lua scripting script cache if the BGSAVE was actually
* started.
*
* Returns C_OK on success or C_ERR otherwise. */
int startBgsaveForReplication(int mincapa) {
int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;
serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
/* Only do rdbSave* when rsiptr is not NULL,
* otherwise slave will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr);
else
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
}
/* If we succeeded to start a BGSAVE with disk target, let's remember
* this fact, so that we can later delete the file if needed. Note
* that we don't set the flag to 1 if the feature is disabled, otherwise
* it would never be cleared: the file is not deleted. This way if
* the user enables it later with CONFIG SET, we are fine. */
if (retval == C_OK && !socket_target && server.rdb_del_sync_files)
RDBGeneratedByReplication = 1;
/* If we failed to BGSAVE, remove the slaves waiting for a full
* resynchronization from the list of slaves, inform them with
* an error about what happened, close the connection ASAP. */
if (retval == C_ERR) {
serverLog(LL_WARNING,"BGSAVE for replication failed");
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE;
slave->flags &= ~CLIENT_SLAVE;
listDelNode(server.slaves,ln);
addReplyError(slave,
"BGSAVE failed, replication can't continue");
slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
return retval;
}
/* If the target is socket, rdbSaveToSlavesSockets() already setup
* the slaves for a full resync. Otherwise for disk target do it now.*/
if (!socket_target) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
}
}
}
/* Flush the script cache, since we need that slave differences are
* accumulated without requiring slaves to match our cached scripts. */
if (retval == C_OK) replicationScriptCacheFlush();
return retval;
}
/* SYNC and PSYNC command implementation. */
void syncCommand(client *c) {
/* ignore SYNC if already slave or in monitor mode */
if (c->flags & CLIENT_SLAVE) return;
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
return;
}
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if (clientHasPendingReplies(c)) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
serverLog(LL_NOTICE,"Replica %s asks for synchronization",
replicationGetSlaveName(c));
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <replid> <offset>
*
* So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
}
/* Full resynchronization. */
server.stat_sync_full++;
/* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
serverLog(LL_NOTICE,"Replication backlog created, my new "
"replication IDs are '%s' and '%s'",
server.replid, server.replid2);
}
/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save. */
client *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences. */
serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}
/* CASE 2: BGSAVE is in progress, with socket target. */
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is progress. */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but another BG operation is active. "
"BGSAVE for replication delayed");
}
}
}
return;
}
/* REPLCONF <option> <value> <option> <value> ...
* This command is used by a slave in order to configure the replication
* process before starting it with the SYNC command.
*
* Currently the only use of this command is to communicate to the master
* what is the listening port of the Slave redis instance, so that the
* master can accurately list slaves and their listening ports in
* the INFO output.
*
* In the future the same command can be used in order to configure
* the replication to initiate an incremental replication instead of a
* full resync. */
void replconfCommand(client *c) {
int j;
if ((c->argc % 2) == 0) {
/* Number of arguments must be odd to make sure that every
* option has a corresponding value. */
addReply(c,shared.syntaxerr);
return;
}
/* Process every option-value pair. */
for (j = 1; j < c->argc; j+=2) {
if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
long port;
if ((getLongFromObjectOrReply(c,c->argv[j+1],
&port,NULL) != C_OK))
return;
c->slave_listening_port = port;
} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
sds ip = c->argv[j+1]->ptr;
if (sdslen(ip) < sizeof(c->slave_ip)) {
memcpy(c->slave_ip,ip,sdslen(ip)+1);
} else {
addReplyErrorFormat(c,"REPLCONF ip-address provided by "
"replica instance is too long: %zd bytes", sdslen(ip));
return;
}
} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
/* Ignore capabilities not understood by this master. */
if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
c->slave_capa |= SLAVE_CAPA_EOF;
else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
c->slave_capa |= SLAVE_CAPA_PSYNC2;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
/* REPLCONF ACK is used by slave to inform the master the amount
* of replication stream that it processed so far. It is an
* internal only command that normal clients should never use. */
long long offset;
if (!(c->flags & CLIENT_SLAVE)) return;
if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
return;
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
c->repl_ack_time = server.unixtime;
/* If this was a diskless replication, we need to really put
* the slave online when the first ACK is received (which
* confirms slave is online and ready to get more data). This
* allows for simpler and less CPU intensive EOF detection
* when streaming RDB files.
* There's a chance the ACK got to us before we detected that the
* bgsave is done (since that depends on cron ticks), so run a
* quick check first (instead of waiting for the next ACK. */
if (server.rdb_child_pid != -1 && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
checkChildrenDone();
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
putSlaveOnline(c);
/* Note: this command does not reply anything! */
return;
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
/* REPLCONF GETACK is used in order to request an ACK ASAP
* to the slave. */
if (server.masterhost && server.master) replicationSendAck();
return;
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
return;
}
}
addReply(c,shared.ok);
}
/* This function puts a replica in the online state, and should be called just
* after a replica received the RDB file for the initial synchronization, and
* we are finally ready to send the incremental stream of commands.
*
* It does a few things:
*
* 1) Put the slave in ONLINE state. Note that the function may also be called
* for a replicas that are already in ONLINE state, but having the flag
* repl_put_online_on_ack set to true: we still have to install the write
* handler in that case. This function will take care of that.
* 2) Make sure the writable event is re-installed, since calling the SYNC
* command disables it, so that we can accumulate output buffer without
* sending it to the replica.
* 3) Update the count of "good replicas". */
void putSlaveOnline(client *slave) {
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) {
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
}
/* We call this function periodically to remove an RDB file that was
* generated because of replication, in an instance that is otherwise
* without any persistence. We don't want instances without persistence
* to take RDB files around, this violates certain policies in certain
* environments. */
void removeRDBUsedToSyncReplicas(void) {
/* If the feature is disabled, return ASAP but also clear the
* RDBGeneratedByReplication flag in case it was set. Otherwise if the
* feature was enabled, but gets disabled later with CONFIG SET, the
* flag may remain set to one: then next time the feature is re-enabled
* via CONFIG SET we have have it set even if no RDB was generated
* because of replication recently. */
if (!server.rdb_del_sync_files) {
RDBGeneratedByReplication = 0;
return;
}
if (allPersistenceDisabled() && RDBGeneratedByReplication) {
client *slave;
listNode *ln;
listIter li;
int delrdb = 1;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END ||
slave->replstate == SLAVE_STATE_SEND_BULK)
{
delrdb = 0;
break; /* No need to check the other replicas. */
}
}
if (delrdb) {
struct stat sb;
if (lstat(server.rdb_filename,&sb) != -1) {
RDBGeneratedByReplication = 0;
serverLog(LL_NOTICE,
"Removing the RDB file used to feed replicas "
"in a persistence-less instance");
bg_unlink(server.rdb_filename);
}
}
}
}
void sendBulkToSlave(connection *conn) {
client *slave = connGetPrivateData(conn);
char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen;
/* Before sending the RDB file, we send the preamble as configured by the
* replication process. Currently the preamble is just the bulk count of
* the file in the form "$<length>\r\n". */
if (slave->replpreamble) {
nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
serverLog(LL_VERBOSE,
"Write error sending RDB preamble to replica: %s",
connGetLastError(conn));
freeClient(slave);
return;
}
server.stat_net_output_bytes += nwritten;
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
/* fall through sending data. */
} else {
return;
}
}
/* If the preamble was already transferred, send the RDB bulk data. */
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) {
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}
if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
connGetLastError(conn));
freeClient(slave);
}
return;
}
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
connSetWriteHandler(slave->conn,NULL);
putSlaveOnline(slave);
}
}
/* Remove one write handler from the list of connections waiting to be writable
* during rdb pipe transfer. */
void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
if (!connHasWriteHandler(conn))
return;
connSetWriteHandler(conn, NULL);
client *slave = connGetPrivateData(conn);
slave->repl_last_partial_write = 0;
server.rdb_pipe_numconns_writing--;
/* if there are no more writes for now for this conn, or write error: */
if (server.rdb_pipe_numconns_writing == 0) {
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
}
/* Called in diskless master during transfer of data from the rdb pipe, when
* the replica becomes writable again. */
void rdbPipeWriteHandler(struct connection *conn) {
serverAssert(server.rdb_pipe_bufflen>0);
client *slave = connGetPrivateData(conn);
int nwritten;
if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
server.rdb_pipe_bufflen - slave->repldboff)) == -1)
{
if (connGetState(conn) == CONN_STATE_CONNECTED)
return; /* equivalent to EAGAIN */
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
connGetLastError(conn));
freeClient(slave);
return;
} else {
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;
if (slave->repldboff < server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
}
}
rdbPipeWriteHandlerConnRemoved(conn);
}
/* Called in diskless master, when there's data to read from the child's rdb pipe */
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
UNUSED(mask);
UNUSED(clientData);
UNUSED(eventLoop);
int i;
if (!server.rdb_pipe_buff)
server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
serverAssert(server.rdb_pipe_numconns_writing==0);
while (1) {
server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);
if (server.rdb_pipe_bufflen < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s", strerror(errno));
for (i=0; i < server.rdb_pipe_numconns; i++) {
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
client *slave = connGetPrivateData(conn);
freeClient(slave);
server.rdb_pipe_conns[i] = NULL;
}
killRDBChild();
return;
}
if (server.rdb_pipe_bufflen == 0) {
/* EOF - write end was closed. */
int stillUp = 0;
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
for (i=0; i < server.rdb_pipe_numconns; i++)
{
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
stillUp++;
}
serverLog(LL_WARNING,"Diskless rdb transfer, done reading from pipe, %d replicas still up.", stillUp);
/* Now that the replicas have finished reading, notify the child that it's safe to exit.
* When the server detectes the child has exited, it can mark the replica as online, and
* start streaming the replication buffers. */
close(server.rdb_child_exit_pipe);
server.rdb_child_exit_pipe = -1;
return;
}
int stillAlive = 0;
for (i=0; i < server.rdb_pipe_numconns; i++)
{
int nwritten;
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
client *slave = connGetPrivateData(conn);
if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s",
connGetLastError(conn));
freeClient(slave);
server.rdb_pipe_conns[i] = NULL;
continue;
}
/* An error and still in connected state, is equivalent to EAGAIN */
slave->repldboff = 0;
} else {
slave->repldboff = nwritten;
server.stat_net_output_bytes += nwritten;
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
if (nwritten != server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
server.rdb_pipe_numconns_writing++;
connSetWriteHandler(conn, rdbPipeWriteHandler);
}
stillAlive++;
}
if (stillAlive == 0) {
serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child.");
killRDBChild();
}
/* Remove the pipe read handler if at least one write handler was set. */
if (server.rdb_pipe_numconns_writing || stillAlive == 0) {
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
break;
}
}
}
/* This function is called at the end of every background saving,
* or when the replication RDB transfer strategy is modified from
* disk to socket or the other way around.
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non-blocking synchronization, and
* to schedule a new BGSAVE if there are slaves that attached while a
* BGSAVE was in progress, but it was not a good one for replication (no
* other slave was accumulating differences).
*
* The argument bgsaveerr is C_OK if the background saving succeeded
* otherwise C_ERR is passed to the function.
* The 'type' argument is the type of the child that terminated
* (if it had a disk or socket target). */
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
int startbgsave = 0;
int mincapa = -1;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
startbgsave = 1;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf;
if (bgsaveerr != C_OK) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}
/* If this was an RDB on disk save, we have to prepare to send
* the RDB from disk to the slave socket. Otherwise if this was
* already an RDB -> Slaves socket transfer, used in the case of
* diskless replication, our work is trivial, we can just put
* the slave online. */
if (type == RDB_CHILD_TYPE_SOCKET) {
serverLog(LL_NOTICE,
"Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
replicationGetSlaveName(slave));
/* Note: we wait for a REPLCONF ACK message from the replica in
* order to really put it online (install the write handler
* so that the accumulated data can be transferred). However
* we change the replication state ASAP, since our slave
* is technically online now.
*
* So things work like that:
*
* 1. We end trasnferring the RDB file via socket.
* 2. The replica is put ONLINE but the write handler
* is not installed.
* 3. The replica however goes really online, and pings us
* back via REPLCONF ACK commands.
* 4. Now we finally install the write handler, and send
* the buffers accumulated so far to the replica.
*
* But why we do that? Because the replica, when we stream
* the RDB directly via the socket, must detect the RDB
* EOF (end of file), that is a special random string at the
* end of the RDB (for streamed RDBs we don't know the length
* in advance). Detecting such final EOF string is much
* simpler and less CPU intensive if no more data is sent
* after such final EOF. So we don't want to glue the end of
* the RDB trasfer with the start of the other replication
* data. */
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
} else {
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
connSetWriteHandler(slave->conn,NULL);
if (connSetWriteHandler(slave->conn,sendBulkToSlave) == C_ERR) {
freeClient(slave);
continue;
}
}
}
}
if (startbgsave) startBgsaveForReplication(mincapa);
}
/* Change the current instance replication ID with a new, random one.
* This will prevent successful PSYNCs between this master and other
* slaves, so the command should be called when something happens that
* alters the current story of the dataset. */
void changeReplicationId(void) {
getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
server.replid[CONFIG_RUN_ID_SIZE] = '\0';
}
/* Clear (invalidate) the secondary replication ID. This happens, for
* example, after a full resynchronization, when we start a new replication
* history. */
void clearReplicationId2(void) {
memset(server.replid2,'0',sizeof(server.replid));
server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
server.second_replid_offset = -1;
}
/* Use the current replication ID / offset as secondary replication
* ID, and change the current one in order to start a new history.
* This should be used when an instance is switched from slave to master
* so that it can serve PSYNC requests performed using the master
* replication ID. */
void shiftReplicationId(void) {
memcpy(server.replid2,server.replid,sizeof(server.replid));
/* We set the second replid offset to the master offset + 1, since
* the slave will ask for the first byte it has not yet received, so
* we need to add one to the offset: for example if, as a slave, we are
* sure we have the same history as the master for 50 bytes, after we
* are turned into a master, we can accept a PSYNC request with offset
* 51, since the slave asking has the same history up to the 50th
* byte, and is asking for the new bytes starting at offset 51. */
server.second_replid_offset = server.master_repl_offset+1;
changeReplicationId();
serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
}
/* ----------------------------------- SLAVE -------------------------------- */
/* Returns 1 if the given replication state is a handshake state,
* 0 otherwise. */
int slaveIsInHandshakeState(void) {
return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
}
/* Avoid the master to detect the slave is timing out while loading the
* RDB file in initial synchronization. We send a single newline character
* that is valid protocol but is guaranteed to either be sent entirely or
* not, since the byte is indivisible.
*
* The function is called in two contexts: while we flush the current
* data with emptyDb(), and while we load the new data received as an
* RDB file from the master. */
void replicationSendNewlineToMaster(void) {
static time_t newline_sent;
if (time(NULL) != newline_sent) {
newline_sent = time(NULL);
/* Pinging back in this stage is best-effort. */
if (server.repl_transfer_s) connWrite(server.repl_transfer_s, "\n", 1);
}
}
/* Callback used by emptyDb() while flushing away old data to load
* the new dataset received by the master. */
void replicationEmptyDbCallback(void *privdata) {
UNUSED(privdata);
if (server.repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster();
}
/* Once we have a link with the master and the synchronization was
* performed, this function materializes the master client we store
* at server.master, starting from the specified file descriptor. */
void replicationCreateMasterClient(connection *conn, int dbid) {
server.master = createClient(conn);
if (conn)
connSetReadHandler(server.master->conn, readQueryFromClient);
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
server.master->user = NULL; /* This client can do everything. */
memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));
/* If master offset is set to -1, this master is old and is not
* PSYNC capable, so we flag it accordingly. */
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
if (dbid != -1) selectDb(server.master,dbid);
}
/* This function will try to re-enable the AOF file after the
* master-replica synchronization: if it fails after multiple attempts
* the replica cannot be considered reliable and exists with an
* error. */
void restartAOFAfterSYNC() {
unsigned int tries, max_tries = 10;
for (tries = 0; tries < max_tries; ++tries) {
if (startAppendOnly() == C_OK) break;
serverLog(LL_WARNING,
"Failed enabling the AOF after successful master synchronization! "
"Trying it again in one second.");
sleep(1);
}
if (tries == max_tries) {
serverLog(LL_WARNING,
"FATAL: this replica instance finished the synchronization with "
"its master, but the AOF can't be turned on. Exiting now.");
exit(1);
}
}
static int useDisklessLoad() {
/* compute boolean decision to use diskless load */
int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
/* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
if (enabled && !moduleAllDatatypesHandleErrors()) {
serverLog(LL_WARNING,
"Skipping diskless-load because there are modules that don't handle read errors.");
enabled = 0;
}
return enabled;
}
/* Helper function for readSyncBulkPayload() to make backups of the current
* databases before socket-loading the new ones. The backups may be restored
* by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */
dbBackup *disklessLoadMakeBackup(void) {
return backupDb();
}
/* Helper function for readSyncBulkPayload(): when replica-side diskless
* database loading is used, Redis makes a backup of the existing databases
* before loading the new ones from the socket.
*
* If the socket loading went wrong, we want to restore the old backups
* into the server databases. */
void disklessLoadRestoreBackup(dbBackup *buckup) {
restoreDbBackup(buckup);
}
/* Helper function for readSyncBulkPayload() to discard our old backups
* when the loading succeeded. */
void disklessLoadDiscardBackup(dbBackup *buckup, int flag) {
discardDbBackup(buckup, flag, replicationEmptyDbCallback);
}
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(connection *conn) {
char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
dbBackup *diskless_load_backup = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
off_t left;
/* Static vars used to hold the EOF mark, and the last bytes received
* from the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_size == -1) {
if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}
if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
/* At this stage just a newline works as a PING in order to take
* the connection live. So we refresh our last interaction
* timestamp. */
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}
/* There are two possible forms for the bulk payload. One is the
* usual $<count> bulk format. The other is used for diskless transfers
* when the master does not know beforehand the size of the file to
* transfer. In the latter case, the following format is used:
*
* $EOF:<40 bytes delimiter>
*
* At the end of the file the announced delimiter is transmitted. The
* delimiter is long and random enough that the probability of a
* collision with the actual file content can be ignored. */
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
/* Set any repl_transfer_size to avoid entering this code path
* at the next call. */
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
use_diskless_load? "to parser":"to disk");
} else {
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
(long long) server.repl_transfer_size,
use_diskless_load? "to parser":"to disk");
}
return;
}
if (!use_diskless_load) {
/* Read the data from the socket, store it to a file and search
* for the EOF. */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
nread = connRead(conn,buf,readlen);
if (nread <= 0) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
/* equivalent to EAGAIN */
return;
}
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake();
return;
}
server.stat_net_input_bytes += nread;
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */
int eof_reached = 0;
if (usemark) {
/* Update the last bytes array, and check if it matches our
* delimiter. */
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
eof_reached = 1;
}
/* Update the last I/O time for the replication transfer (used in
* order to detect timeouts during replication), and write what we
* got from the socket to the dump file on disk. */
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,
"Write error or short write writing to the DB dump file "
"needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}
server.repl_transfer_read += nread;
/* Delete the last 40 bytes from the file if we reached EOF. */
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,
"Error truncating the RDB file received from the master "
"for SYNC: %s", strerror(errno));
goto error;
}
}
/* Sync data on disk from time to time, otherwise at the end of the
* transfer we may suffer a big delay as the memory buffers are copied
* into the actual disk. */
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}
/* Check if the transfer is now complete */
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}
/* If the transfer is yet not complete, we need to read more, so
* return ASAP and wait for the handler to be called again. */
if (!eof_reached) return;
}
/* We reach this point in one of the following cases:
*
* 1. The replica is using diskless replication, that is, it reads data
* directly from the socket to the Redis memory, without using
* a temporary RDB file on disk. In that case we just block and
* read everything from the socket.
*
* 2. Or when we are done reading from the socket to the RDB file, in
* such case we want just to read the RDB file in memory. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOF rewriting child before flusing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
/* When diskless RDB loading is used by replicas, it may be configured
* in order to save the current DB instead of throwing it away,
* so that we can restore it in case of failed transfer. */
if (use_diskless_load &&
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
{
/* Create a backup of server.db[] and initialize to empty
* dictionaries. */
diskless_load_backup = disklessLoadMakeBackup();
}
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
* (Where disklessLoadMakeBackup left server.db empty) because we
* want to execute all the auxiliary logic of emptyDb (Namely,
* fire module events) */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
connSetReadHandler(conn, NULL);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
rioInitWithConn(&rdb,conn,server.repl_transfer_size);
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
/* RDB loading failed. */
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
"from socket: %s", strerror(errno));
stopLoading(0);
cancelReplicationHandshake();
rioFreeConn(&rdb, NULL);
/* Remove the half-loaded data in case we started with
* an empty replica. */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Restore the backed up databases. */
disklessLoadRestoreBackup(diskless_load_backup);
}
/* Note that there's no point in restarting the AOF on SYNC
* failure, it'll be restarted when sync succeeds or the replica
* gets promoted. */
return;
}
stopLoading(1);
/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Delete the backup databases we created before starting to load
* the new RDB. Now the RDB was loaded with success so the old
* data is useless. */
disklessLoadDiscardBackup(diskless_load_backup, empty_db_flags);
}
/* Verify the end mark is correct. */
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||
memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0)
{
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake();
rioFreeConn(&rdb, NULL);
return;
}
}
/* Cleanup and restore the socket to the original state to continue
* with the normal replication. */
rioFreeConn(&rdb, NULL);
connNonBlock(conn);
connRecvTimeout(conn,0);
} else {
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.rdb_child_pid);
killRDBChild();
}
/* Make sure the new file (also used for persistence) is fully synced
* (not covered by earlier calls to rdb_fsync_range). */
if (fsync(server.repl_transfer_fd) == -1) {
serverLog(LL_WARNING,
"Failed trying to sync the temp DB to disk in "
"MASTER <-> REPLICA synchronization: %s",
strerror(errno));
cancelReplicationHandshake();
return;
}
/* Rename rdb like renaming rewrite aof asynchronously. */
int old_rdb_fd = open(server.rdb_filename,O_RDONLY|O_NONBLOCK);
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,
"Failed trying to rename the temp DB into %s in "
"MASTER <-> REPLICA synchronization: %s",
server.rdb_filename, strerror(errno));
cancelReplicationHandshake();
if (old_rdb_fd != -1) close(old_rdb_fd);
return;
}
/* Close old rdb asynchronously. */
if (old_rdb_fd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)old_rdb_fd,NULL,NULL);
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization "
"DB from disk");
cancelReplicationHandshake();
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE,"Removing the RDB file obtained from "
"the master. This replica has persistence "
"disabled");
bg_unlink(server.rdb_filename);
}
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or replica promoted. */
return;
}
/* Cleanup. */
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
serverLog(LL_NOTICE,"Removing the RDB file obtained from "
"the master. This replica has persistence "
"disabled");
bg_unlink(server.rdb_filename);
}
zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);
server.repl_transfer_fd = -1;
server.repl_transfer_tmpfile = NULL;
}
/* Final setup of the connected slave <- master link */
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* Fire the master link modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_UP,
NULL);
/* After a full resynchronization we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n");
}
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (server.aof_enabled) restartAOFAfterSYNC();
return;
error:
cancelReplicationHandshake();
return;
}
/* Send a synchronous command to the master. Used to send AUTH and
* REPLCONF commands before starting the replication with SYNC.
*
* The command returns an sds string representing the result of the
* operation. On error the first byte is a "-".
*/
#define SYNC_CMD_READ (1<<0)
#define SYNC_CMD_WRITE (1<<1)
#define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
char *sendSynchronousCommand(int flags, connection *conn, ...) {
/* Create the command to send to the master, we use redis binary
* protocol to make sure correct arguments are sent. This function
* is not safe for all binary data. */
if (flags & SYNC_CMD_WRITE) {
char *arg;
va_list ap;
sds cmd = sdsempty();
sds cmdargs = sdsempty();
size_t argslen = 0;
va_start(ap,conn);
while(1) {
arg = va_arg(ap, char*);
if (arg == NULL) break;
cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
argslen++;
}
va_end(ap);
cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
cmd = sdscatsds(cmd,cmdargs);
sdsfree(cmdargs);
/* Transfer command to the server. */
if (connSyncWrite(conn,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
== -1)
{
sdsfree(cmd);
return sdscatprintf(sdsempty(),"-Writing to master: %s",
connGetLastError(conn));
}
sdsfree(cmd);
}
/* Read the reply from the server. */
if (flags & SYNC_CMD_READ) {
char buf[256];
if (connSyncReadLine(conn,buf,sizeof(buf),server.repl_syncio_timeout*1000)
== -1)
{
return sdscatprintf(sdsempty(),"-Reading from master: %s",
strerror(errno));
}
server.repl_transfer_lastio = server.unixtime;
return sdsnew(buf);
}
return NULL;
}
/* Try a partial resynchronization with the master if we are about to reconnect.
* If there is no cached master structure, at least try to issue a
* "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
* command in order to obtain the master replid and the master replication
* global offset.
*
* This function is designed to be called from syncWithMaster(), so the
* following assumptions are made:
*
* 1) We pass the function an already connected socket "fd".
* 2) This function does not close the file descriptor "fd". However in case
* of successful partial resynchronization, the function will reuse
* 'fd' as file descriptor of the server.master client structure.
*
* The function is split in two halves: if read_reply is 0, the function
* writes the PSYNC command on the socket, and a new function call is
* needed, with read_reply set to 1, in order to read the reply of the
* command. This is useful in order to support non blocking operations, so
* that we write, return into the event loop, and read when there are data.
*
* When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
* was a write error, or PSYNC_WAIT_REPLY to signal we need another call
* with read_reply set to 1. However even when read_reply is set to 1
* the function may return PSYNC_WAIT_REPLY again to signal there were
* insufficient data to read to complete its work. We should re-enter
* into the event loop and wait in such a case.
*
* The function returns:
*
* PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue.
* PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
* In this case the master replid and global replication
* offset is saved.
* PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
* the caller should fall back to SYNC.
* PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
* PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
* PSYNC_TRY_LATER: Master is currently in a transient error condition.
*
* Notable side effects:
*
* 1) As a side effect of the function call the function removes the readable
* event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
* 2) server.master_initial_offset is set to the right value according
* to the master reply. This will be used to populate the 'server.master'
* structure replication offset.
*/
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
/* Initially set master_initial_offset to -1 to mark the current
* master replid and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.master_initial_offset = -1;
if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
/* Reading half */
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
connSetReadHandler(conn, NULL);
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the replid
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
if (end-start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';
if (strcmp(new,server.cached_master->replid)) {
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);
/* Set the old ID as our ID2, up to the current offset+1. */
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;
/* Update the cached master ID and our own primary ID to the
* new one. */
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));
/* Disconnect all the sub-slaves: they need to be notified. */
disconnectSlaves();
}
}
/* Setup the replication to continue. */
sdsfree(reply);
replicationResurrectCachedMaster(conn);
/* If this instance was restarted and we read the metadata to
* PSYNC from the persistence file, our replication backlog could
* be still not initialized. Create it. */
if (server.repl_backlog == NULL) createReplicationBacklog();
return PSYNC_CONTINUE;
}
/* If we reach this point we received either an error (since the master does
* not understand PSYNC or because it is in a special state and cannot
* serve our request), or an unexpected reply from the master.
*
* Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
* return PSYNC_TRY_LATER if we believe this is a transient error. */
if (!strncmp(reply,"-NOMASTERLINK",13) ||
!strncmp(reply,"-LOADING",8))
{
serverLog(LL_NOTICE,
"Master is currently unable to PSYNC "
"but should be in the future: %s", reply);
sdsfree(reply);
return PSYNC_TRY_LATER;
}
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
serverLog(LL_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
serverLog(LL_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
/* This handler fires when the non blocking connect was able to
* establish a connection with the master. */
void syncWithMaster(connection *conn) {
char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5;
int psync_result;
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP. */
if (server.repl_state == REPL_STATE_NONE) {
connClose(conn);
return;
}
/* Check for errors in the socket: after a non blocking connect() we
* may find that the socket is in error state. */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
connGetLastError(conn));
goto error;
}
/* Send a PING to check the master is able to reply without errors. */
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithMaster);
connSetWriteHandler(conn, NULL);
server.repl_state = REPL_STATE_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);
if (err) goto write_error;
return;
}
/* Receive the PONG command. */
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-NOPERM",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE,
"Master replied to PING, replication can continue...");
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_AUTH;
}
/* AUTH with the master if required. */
if (server.repl_state == REPL_STATE_SEND_AUTH) {
if (server.masteruser && server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",
server.masteruser,server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else if (server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else {
server.repl_state = REPL_STATE_SEND_PORT;
}
}
/* Receive AUTH reply. */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PORT;
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
if (server.repl_state == REPL_STATE_SEND_PORT) {
int port;
if (server.slave_announce_port) port = server.slave_announce_port;
else if (server.tls_replication && server.tls_port) port = server.tls_port;
else port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"listening-port",portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}
/* Receive REPLCONF listening-port reply. */
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF listening-port: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP;
}
/* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
if (server.repl_state == REPL_STATE_SEND_IP &&
server.slave_announce_ip == NULL)
{
server.repl_state = REPL_STATE_SEND_CAPA;
}
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP;
return;
}
/* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF ip-address: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_CAPA;
}
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return;
}
/* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master replid
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
"state should be RECEIVE_PSYNC but is %d",
server.repl_state);
goto error;
}
psync_result = slaveTryPartialResynchronization(conn,1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
/* If the master is in an transient error, we should try to PSYNC
* from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its
* master and so forth. */
if (psync_result == PSYNC_TRY_LATER) goto error;
/* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n");
}
return;
}
/* PSYNC failed or is not supported: we want our slaves to resync with us
* as well, if we have any sub-slaves. The master may transfer us an
* entirely different data set and we have no way to incrementally feed
* our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.master_replid and master_initial_offset are
* already populated. */
if (psync_result == PSYNC_NOT_SUPPORTED) {
serverLog(LL_NOTICE,"Retrying with SYNC...");
if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
/* Prepare a suitable temp file for bulk transfer */
if (!useDisklessLoad()) {
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.repl_transfer_fd = dfd;
}
/* Setup the non blocking download of the bulk file. */
if (connSetReadHandler(conn, readSyncBulkPayload)
== C_ERR)
{
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (%s)",
strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_lastio = server.unixtime;
return;
error:
if (dfd != -1) close(dfd);
connClose(conn);
server.repl_transfer_s = NULL;
if (server.repl_transfer_fd != -1)
close(server.repl_transfer_fd);
if (server.repl_transfer_tmpfile)
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_state = REPL_STATE_CONNECT;
return;
write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
sdsfree(err);
goto error;
}
int connectWithMaster(void) {
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
connGetLastError(server.repl_transfer_s));
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
return C_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}
/* This function can be called when a non blocking connection is currently
* in progress to undo it.
* Never call this function directly, use cancelReplicationHandshake() instead.
*/
void undoConnectWithMaster(void) {
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
}
/* Abort the async download of the bulk dataset while SYNC-ing with master.
* Never call this function directly, use cancelReplicationHandshake() instead.
*/
void replicationAbortSyncTransfer(void) {
serverAssert(server.repl_state == REPL_STATE_TRANSFER);
undoConnectWithMaster();
if (server.repl_transfer_fd!=-1) {
close(server.repl_transfer_fd);
bg_unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
}
}
/* This function aborts a non blocking replication attempt if there is one
* in progress, by canceling the non-blocking connect attempt or
* the initial bulk transfer.
*
* If there was a replication handshake in progress 1 is returned and
* the replication state (server.repl_state) set to REPL_STATE_CONNECT.
*
* Otherwise zero is returned and no operation is performed at all. */
int cancelReplicationHandshake(void) {
if (server.repl_state == REPL_STATE_TRANSFER) {
replicationAbortSyncTransfer();
server.repl_state = REPL_STATE_CONNECT;
} else if (server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState())
{
undoConnectWithMaster();
server.repl_state = REPL_STATE_CONNECT;
} else {
return 0;
}
return 1;
}
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) {
freeClient(server.master);
}
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
/* Update oom_score_adj */
setOOMScoreAdj(-1);
/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
disconnectSlaves();
cancelReplicationHandshake();
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.repl_state = REPL_STATE_CONNECT;
}
/* Cancel replication, setting the instance as a master itself. */
void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return; /* Nothing to do. */
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
sdsfree(server.masterhost);
server.masterhost = NULL;
if (server.master) freeClient(server.master);
replicationDiscardCachedMaster();
cancelReplicationHandshake();
/* When a slave is turned into a master, the current replication ID
* (that was inherited from the master at synchronization time) is
* used as secondary ID up to the current offset, and a new replication
* ID is created to continue with a new replication history.
*
* NOTE: this function MUST be called after we call
* freeClient(server.master), since there we adjust the replication
* offset trimming the final PINGs. See Github issue #7320. */
shiftReplicationId();
/* Disconnecting all the slaves is required: we need to inform slaves
* of the replication ID change (see shiftReplicationId() call). However
* the slaves will be able to partially resync with us, so it will be
* a very fast reconnection. */
disconnectSlaves();
server.repl_state = REPL_STATE_NONE;
/* We need to make sure the new master will start the replication stream
* with a SELECT statement. This is forced after a full resync, but
* with PSYNC version 2, there is no need for full resync after a
* master switch. */
server.slaveseldb = -1;
/* Update oom_score_adj */
setOOMScoreAdj(-1);
/* Once we turn from slave to master, we consider the starting time without
* slaves (that is used to count the replication backlog time to live) as
* starting from now. Otherwise the backlog will be freed after a
* failover if slaves do not connect immediately. */
server.repl_no_slaves_since = server.unixtime;
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
NULL);
/* Restart the AOF subsystem in case we shut it down during a sync when
* we were still a slave. */
if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC();
}
/* This function is called when the slave lose the connection with the
* master into an unexpected way. */
void replicationHandleMasterDisconnection(void) {
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;
/* We lost connection with our master, don't disconnect slaves yet,
* maybe we'll be able to PSYNC with our master later. We'll disconnect
* the slaves only if we'll have to do a full resync with our master. */
}
void replicaofCommand(client *c) {
/* SLAVEOF is not allowed in cluster mode as replication is automatically
* configured using the current address of the master node. */
if (server.cluster_enabled) {
addReplyError(c,"REPLICAOF not allowed in cluster mode.");
return;
}
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) {
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
}
} else {
long port;
if (c->flags & CLIENT_SLAVE)
{
/* If a client is already a replica they cannot run this command,
* because it involves flushing all replicas (including this
* client) */
addReplyError(c, "Command is not valid when client is a replica.");
return;
}
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
return;
/* Check if we are already attached to the specified slave */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
"with the master we are already connected "
"with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified "
"master\r\n"));
return;
}
/* There was no previous master or the user specified a different one,
* we can continue. */
replicationSetMaster(c->argv[1]->ptr, port);
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
server.masterhost, server.masterport, client);
sdsfree(client);
}
addReply(c,shared.ok);
}
/* ROLE command: provide information about the role of the instance
* (master or slave) and additional information related to replication
* in an easy to process format. */
void roleCommand(client *c) {
if (server.masterhost == NULL) {
listIter li;
listNode *ln;
void *mbcount;
int slaves = 0;
addReplyArrayLen(c,3);
addReplyBulkCBuffer(c,"master",6);
addReplyLongLong(c,server.master_repl_offset);
mbcount = addReplyDeferredLen(c);
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
if (slaveip[0] == '\0') {
if (connPeerToString(slave->conn,ip,sizeof(ip),NULL) == -1)
continue;
slaveip = ip;
}
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
addReplyArrayLen(c,3);
addReplyBulkCString(c,slaveip);
addReplyBulkLongLong(c,slave->slave_listening_port);
addReplyBulkLongLong(c,slave->repl_ack_off);
slaves++;
}
setDeferredArrayLen(c,mbcount,slaves);
} else {
char *slavestate = NULL;
addReplyArrayLen(c,5);
addReplyBulkCBuffer(c,"slave",5);
addReplyBulkCString(c,server.masterhost);
addReplyLongLong(c,server.masterport);
if (slaveIsInHandshakeState()) {
slavestate = "handshake";
} else {
switch(server.repl_state) {
case REPL_STATE_NONE: slavestate = "none"; break;
case REPL_STATE_CONNECT: slavestate = "connect"; break;
case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
case REPL_STATE_TRANSFER: slavestate = "sync"; break;
case REPL_STATE_CONNECTED: slavestate = "connected"; break;
default: slavestate = "unknown"; break;
}
}
addReplyBulkCString(c,slavestate);
addReplyLongLong(c,server.master ? server.master->reploff : -1);
}
}
/* Send a REPLCONF ACK command to the master to inform it about the current
* processed offset. If we are not connected with a master, the command has
* no effects. */
void replicationSendAck(void) {
client *c = server.master;
if (c != NULL) {
c->flags |= CLIENT_MASTER_FORCE_REPLY;
addReplyArrayLen(c,3);
addReplyBulkCString(c,"REPLCONF");
addReplyBulkCString(c,"ACK");
addReplyBulkLongLong(c,c->reploff);
c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
}
}
/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
/* In order to implement partial synchronization we need to be able to cache
* our master's client structure after a transient disconnection.
* It is cached into server.cached_master and flushed away using the following
* functions. */
/* This function is called by freeClient() in order to cache the master
* client structure instead of destroying it. freeClient() will return
* ASAP after this function returns, so every action needed to avoid problems
* with a client that is really "suspended" has to be done by this function.
*
* The other functions that will deal with the cached master are:
*
* replicationDiscardCachedMaster() that will make sure to kill the client
* as for some reason we don't want to use it in the future.
*
* replicationResurrectCachedMaster() that is used after a successful PSYNC
* handshake in order to reactivate the cached master.
*/
void replicationCacheMaster(client *c) {
serverAssert(server.master != NULL && server.cached_master == NULL);
serverLog(LL_NOTICE,"Caching the disconnected master state.");
/* Unlink the client from the server structures. */
unlinkClient(c);
/* Reset the master client so that's ready to accept new commands:
* we want to discard te non processed query buffers and non processed
* offsets, including pending transactions, already populated arguments,
* pending outputs to the master. */
sdsclear(server.master->querybuf);
sdsclear(server.master->pending_querybuf);
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
c->sentlen = 0;
c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.master;
/* Invalidate the Peer ID cache. */
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}
/* Caching the master happens instead of the actual freeClient() call,
* so make sure to adjust the replication state. This function will
* also set server.master to NULL. */
replicationHandleMasterDisconnection();
}
/* This function is called when a master is turend into a slave, in order to
* create from scratch a cached master for the new client, that will allow
* to PSYNC with the slave that was promoted as the new master after a
* failover.
*
* Assuming this instance was previously the master instance of the new master,
* the new master will accept its replication ID, and potentiall also the
* current offset if no data was lost during the failover. So we use our
* current replication ID and offset in order to synthesize a cached master. */
void replicationCacheMasterUsingMyself(void) {
serverLog(LL_NOTICE,
"Before turning into a replica, using my own master parameters "
"to synthesize a cached master: I may be able to synchronize with "
"the new master with just a partial transfer.");
/* This will be used to populate the field server.master->reploff
* by replicationCreateMasterClient(). We'll later set the created
* master as server.cached_master, so the replica will use such
* offset for PSYNC. */
server.master_initial_offset = server.master_repl_offset;
/* The master client we create can be set to any DBID, because
* the new master will start its replication stream with SELECT. */
replicationCreateMasterClient(NULL,-1);
/* Use our own ID / offset. */
memcpy(server.master->replid, server.replid, sizeof(server.replid));
/* Set as cached master. */
unlinkClient(server.master);
server.cached_master = server.master;
server.master = NULL;
}
/* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */
void replicationDiscardCachedMaster(void) {
if (server.cached_master == NULL) return;
serverLog(LL_NOTICE,"Discarding previously cached master state.");
server.cached_master->flags &= ~CLIENT_MASTER;
freeClient(server.cached_master);
server.cached_master = NULL;
}
/* Turn the cached master into the current master, using the file descriptor
* passed as argument as the socket for the new master.
*
* This function is called when successfully setup a partial resynchronization
* so the stream of data that we'll receive will start from were this
* master left. */
void replicationResurrectCachedMaster(connection *conn) {
server.master = server.cached_master;
server.cached_master = NULL;
server.master->conn = conn;
connSetPrivateData(server.master->conn, server.master);
server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime;
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
/* Fire the master link modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_UP,
NULL);
/* Re-add to the list of clients. */
linkClient(server.master);
if (connSetReadHandler(server.master->conn, readQueryFromClient)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
/* We may also need to install the write handler as well if there is
* pending data in the write buffers. */
if (clientHasPendingReplies(server.master)) {
if (connSetWriteHandler(server.master->conn, sendReplyToClient)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
}
}
/* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
/* This function counts the number of slaves with lag <= min-slaves-max-lag.
* If the option is active, the server will prevent writes if there are not
* enough connected slaves with the specified lag (or less). */
void refreshGoodSlavesCount(void) {
listIter li;
listNode *ln;
int good = 0;
if (!server.repl_min_slaves_to_write ||
!server.repl_min_slaves_max_lag) return;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
time_t lag = server.unixtime - slave->repl_ack_time;
if (slave->replstate == SLAVE_STATE_ONLINE &&
lag <= server.repl_min_slaves_max_lag) good++;
}
server.repl_good_slaves_count = good;
}
/* ----------------------- REPLICATION SCRIPT CACHE --------------------------
* The goal of this code is to keep track of scripts already sent to every
* connected slave, in order to be able to replicate EVALSHA as it is without
* translating it to EVAL every time it is possible.
*
* We use a capped collection implemented by a hash table for fast lookup
* of scripts we can send as EVALSHA, plus a linked list that is used for
* eviction of the oldest entry when the max number of items is reached.
*
* We don't care about taking a different cache for every different slave
* since to fill the cache again is not very costly, the goal of this code
* is to avoid that the same big script is transmitted a big number of times
* per second wasting bandwidth and processor speed, but it is not a problem
* if we need to rebuild the cache from scratch from time to time, every used
* script will need to be transmitted a single time to reappear in the cache.
*
* This is how the system works:
*
* 1) Every time a new slave connects, we flush the whole script cache.
* 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
* trying to convert EVAL into EVALSHA specifically for slaves.
* 3) Every time we transmit a script as EVAL to the slaves, we also add the
* corresponding SHA1 of the script into the cache as we are sure every
* slave knows about the script starting from now.
* 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
* and at the same time flush the script cache.
* 5) When the last slave disconnects, flush the cache.
* 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
* in the master sometimes.
*/
/* Initialize the script cache, only called at startup. */
void replicationScriptCacheInit(void) {
server.repl_scriptcache_size = 10000;
server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
server.repl_scriptcache_fifo = listCreate();
}
/* Empty the script cache. Should be called every time we are no longer sure
* that every slave knows about all the scripts in our set, or when the
* current AOF "context" is no longer aware of the script. In general we
* should flush the cache:
*
* 1) Every time a new slave reconnects to this master and performs a
* full SYNC (PSYNC does not require flushing).
* 2) Every time an AOF rewrite is performed.
* 3) Every time we are left without slaves at all, and AOF is off, in order
* to reclaim otherwise unused memory.
*/
void replicationScriptCacheFlush(void) {
dictEmpty(server.repl_scriptcache_dict,NULL);
listRelease(server.repl_scriptcache_fifo);
server.repl_scriptcache_fifo = listCreate();
}
/* Add an entry into the script cache, if we reach max number of entries the
* oldest is removed from the list. */
void replicationScriptCacheAdd(sds sha1) {
int retval;
sds key = sdsdup(sha1);
/* Evict oldest. */
if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
{
listNode *ln = listLast(server.repl_scriptcache_fifo);
sds oldest = listNodeValue(ln);
retval = dictDelete(server.repl_scriptcache_dict,oldest);
serverAssert(retval == DICT_OK);
listDelNode(server.repl_scriptcache_fifo,ln);
}
/* Add current. */
retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
listAddNodeHead(server.repl_scriptcache_fifo,key);
serverAssert(retval == DICT_OK);
}
/* Returns non-zero if the specified entry exists inside the cache, that is,
* if all the slaves are aware of this script SHA1. */
int replicationScriptCacheExists(sds sha1) {
return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
}
/* ----------------------- SYNCHRONOUS REPLICATION --------------------------
* Redis synchronous replication design can be summarized in points:
*
* - Redis masters have a global replication offset, used by PSYNC.
* - Master increment the offset every time new commands are sent to slaves.
* - Slaves ping back masters with the offset processed so far.
*
* So synchronous replication adds a new WAIT command in the form:
*
* WAIT <num_replicas> <milliseconds_timeout>
*
* That returns the number of replicas that processed the query when
* we finally have at least num_replicas, or when the timeout was
* reached.
*
* The command is implemented in this way:
*
* - Every time a client processes a command, we remember the replication
* offset after sending that command to the slaves.
* - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
* The client is blocked at the same time (see blocked.c).
* - Once we receive enough ACKs for a given offset or when the timeout
* is reached, the WAIT command is unblocked and the reply sent to the
* client.
*/
/* This just set a flag so that we broadcast a REPLCONF GETACK command
* to all the slaves in the beforeSleep() function. Note that this way
* we "group" all the clients that want to wait for synchronous replication
* in a given event loop iteration, and send a single GETACK for them all. */
void replicationRequestAckFromSlaves(void) {
server.get_ack_from_slaves = 1;
}
/* Return the number of slaves that already acknowledged the specified
* replication offset. */
int replicationCountAcksByOffset(long long offset) {
listIter li;
listNode *ln;
int count = 0;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->repl_ack_off >= offset) count++;
}
return count;
}
/* WAIT for N replicas to acknowledge the processing of our latest
* write command (and all the previous commands). */
void waitCommand(client *c) {
mstime_t timeout;
long numreplicas, ackreplicas;
long long offset = c->woff;
if (server.masterhost) {
addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated.");
return;
}
/* Argument parsing. */
if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
return;
if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
!= C_OK) return;
/* First try without blocking at all. */
ackreplicas = replicationCountAcksByOffset(c->woff);
if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
addReplyLongLong(c,ackreplicas);
return;
}
/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
c->bpop.timeout = timeout;
c->bpop.reploffset = offset;
c->bpop.numreplicas = numreplicas;
listAddNodeTail(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);
/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
replicationRequestAckFromSlaves();
}
/* This is called by unblockClient() to perform the blocking op type
* specific cleanup. We just remove the client from the list of clients
* waiting for replica acks. Never call it directly, call unblockClient()
* instead. */
void unblockClientWaitingReplicas(client *c) {
listNode *ln = listSearchKey(server.clients_waiting_acks,c);
serverAssert(ln != NULL);
listDelNode(server.clients_waiting_acks,ln);
}
/* Check if there are clients blocked in WAIT that can be unblocked since
* we received enough ACKs from slaves. */
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
int last_numreplicas = 0;
listIter li;
listNode *ln;
listRewind(server.clients_waiting_acks,&li);
while((ln = listNext(&li))) {
client *c = ln->value;
/* Every time we find a client that is satisfied for a given
* offset and number of replicas, we remember it so the next client
* may be unblocked without calling replicationCountAcksByOffset()
* if the requested offset / replicas were equal or less. */
if (last_offset && last_offset > c->bpop.reploffset &&
last_numreplicas > c->bpop.numreplicas)
{
unblockClient(c);
addReplyLongLong(c,last_numreplicas);
} else {
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
if (numreplicas >= c->bpop.numreplicas) {
last_offset = c->bpop.reploffset;
last_numreplicas = numreplicas;
unblockClient(c);
addReplyLongLong(c,numreplicas);
}
}
}
}
/* Return the slave replication offset for this instance, that is
* the offset for which we already processed the master replication stream. */
long long replicationGetSlaveOffset(void) {
long long offset = 0;
if (server.masterhost != NULL) {
if (server.master) {
offset = server.master->reploff;
} else if (server.cached_master) {
offset = server.cached_master->reploff;
}
}
/* offset may be -1 when the master does not support it at all, however
* this function is designed to return an offset that can express the
* amount of data processed by the master, so we return a positive
* integer. */
if (offset < 0) offset = 0;
return offset;
}
/* --------------------------- REPLICATION CRON ---------------------------- */
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
static long long replication_cron_loops = 0;
/* Non blocking connection timeout? */
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake();
}
/* Bulk transfer I/O timeout? */
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
cancelReplicationHandshake();
}
/* Timed out master when we are an already connected slave? */
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
/* Send ACK to master from time to time.
* Note that we do not send periodic acks to masters that don't
* support PSYNC and replication offsets. */
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
listIter li;
listNode *ln;
robj *ping_argv[1];
/* First, send PING according to ping_slave_period. */
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
/* Note that we don't send the PING if the clients are paused during
* a Redis Cluster manual failover: the PING we send will otherwise
* alter the replication offsets of master and slave, and will no longer
* match the one stored into 'mf_master_offset' state. */
int manual_failover_in_progress =
server.cluster_enabled &&
server.cluster->mf_end &&
clientsArePaused();
if (!manual_failover_in_progress) {
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
}
}
/* Second, send a newline to all the slaves in pre-synchronization
* stage, that is, slaves waiting for the master to create the RDB file.
*
* Also send the a newline to all the chained slaves we have, if we lost
* connection from our master, to keep the slaves aware that their
* master is online. This is needed since sub-slaves only receive proxied
* data from top-level masters, so there is no explicit pinging in order
* to avoid altering the replication offsets. This special out of band
* pings (newlines) can be sent, they will have no effect in the offset.
*
* The newline will be ignored by the slave but will refresh the
* last interaction timer preventing a timeout. In this case we ignore the
* ping period and refresh the connection once per second since certain
* timeouts are set at a few seconds (example: PSYNC response). */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
if (is_presync) {
connWrite(slave->conn, "\n", 1);
}
}
/* Disconnect timedout slaves. */
if (listLength(server.slaves)) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_ONLINE) {
if (slave->flags & CLIENT_PRE_PSYNC)
continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
replicationGetSlaveName(slave));
freeClient(slave);
continue;
}
}
/* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
* by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
* from terminating. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
if (slave->repl_last_partial_write != 0 &&
(server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
replicationGetSlaveName(slave));
freeClient(slave);
continue;
}
}
}
}
/* If this is a master without attached slaves and there is a replication
* backlog active, in order to reclaim memory we can free it after some
* (configured) time. Note that this cannot be done for slaves: slaves
* without sub-slaves attached should still accumulate data into the
* backlog, in order to reply to PSYNC queries if they are turned into
* masters after a failover. */
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog && server.masterhost == NULL)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;
if (idle > server.repl_backlog_time_limit) {
/* When we free the backlog, we always use a new
* replication ID and clear the ID2. This is needed
* because when there is no backlog, the master_repl_offset
* is not updated, but we would still retain our replication
* ID, leading to the following problem:
*
* 1. We are a master instance.
* 2. Our slave is promoted to master. It's repl-id-2 will
* be the same as our repl-id.
* 3. We, yet as master, receive some updates, that will not
* increment the master_repl_offset.
* 4. Later we are turned into a slave, connect to the new
* master that will accept our PSYNC request by second
* replication ID, but there will be data inconsistency
* because we received writes. */
changeReplicationId();
clearReplicationId2();
freeReplicationBacklog();
serverLog(LL_NOTICE,
"Replication backlog freed after %d seconds "
"without connected replicas.",
(int) server.repl_backlog_time_limit);
}
}
/* If AOF is disabled and we no longer have attached slaves, we can
* free our Replication Script Cache as there is no need to propagate
* EVALSHA at all. */
if (listLength(server.slaves) == 0 &&
server.aof_state == AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
{
replicationScriptCacheFlush();
}
/* Start a BGSAVE good for replication if we have slaves in
* WAIT_BGSAVE_START state.
*
* In case of diskless replication, we make sure to wait the specified
* number of seconds (according to configuration) so that other slaves
* have the time to arrive before we start streaming. */
if (!hasActiveChildProcess()) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
}
}
if (slaves_waiting &&
(!server.repl_diskless_sync ||
max_idle > server.repl_diskless_sync_delay))
{
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities. */
startBgsaveForReplication(mincapa);
}
}
/* Remove the RDB file used for replication if Redis is not running
* with any persistence. */
removeRDBUsedToSyncReplicas();
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */
}
|