1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289
|
#include "platform_sys.h"
#include <iterator>
#include "api.h"
#include "group.h"
using namespace std;
using namespace srt::sync;
using namespace srt::groups;
using namespace srt_logging;
// The SRT_DEF_VERSION is defined in core.cpp.
extern const int32_t SRT_DEF_VERSION;
namespace srt {
int32_t CUDTGroup::s_tokenGen = 0;
// [[using locked(this->m_GroupLock)]];
bool CUDTGroup::getBufferTimeBase(CUDT* forthesakeof,
steady_clock::time_point& w_tb,
bool& w_wp,
steady_clock::duration& w_dr)
{
CUDT* master = 0;
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
CUDT* u = &gi->ps->core();
if (gi->laststatus != SRTS_CONNECTED)
{
HLOGC(gmlog.Debug,
log << "getBufferTimeBase: skipping @" << u->m_SocketID
<< ": not connected, state=" << SockStatusStr(gi->laststatus));
continue;
}
if (u == forthesakeof)
continue; // skip the member if it's the target itself
if (!u->m_pRcvBuffer)
continue; // Not initialized yet
master = u;
break; // found
}
// We don't have any sockets in the group, so can't get
// the buffer timebase. This should be then initialized
// the usual way.
if (!master)
return false;
master->m_pRcvBuffer->getInternalTimeBase((w_tb), (w_wp), (w_dr));
// Sanity check
if (is_zero(w_tb))
{
LOGC(gmlog.Error, log << "IPE: existing previously socket has no time base set yet!");
return false; // this will enforce initializing the time base normal way
}
return true;
}
// [[using locked(this->m_GroupLock)]];
bool CUDTGroup::applyGroupSequences(SRTSOCKET target, int32_t& w_snd_isn, int32_t& w_rcv_isn)
{
if (m_bConnected) // You are the first one, no need to change.
{
IF_HEAVY_LOGGING(string update_reason = "what?");
// Find a socket that is declared connected and is not
// the socket that caused the call.
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->id == target)
continue;
CUDT& se = gi->ps->core();
if (!se.m_bConnected)
continue;
// Found it. Get the following sequences:
// For sending, the sequence that is about to be sent next.
// For receiving, the sequence of the latest received packet.
// SndCurrSeqNo is initially set to ISN-1, this next one is
// the sequence that is about to be stamped on the next sent packet
// over that socket. Using this field is safer because it is atomic
// and its affinity is to the same thread as the sending function.
// NOTE: the groupwise scheduling sequence might have been set
// already. If so, it means that it was set by either:
// - the call of this function on the very first conencted socket (see below)
// - the call to `sendBroadcast` or `sendBackup`
// In both cases, we want THIS EXACTLY value to be reported
if (m_iLastSchedSeqNo != -1)
{
w_snd_isn = m_iLastSchedSeqNo;
IF_HEAVY_LOGGING(update_reason = "GROUPWISE snd-seq");
}
else
{
w_snd_isn = se.m_iSndNextSeqNo;
// Write it back to the groupwise scheduling sequence so that
// any next connected socket will take this value as well.
m_iLastSchedSeqNo = w_snd_isn;
IF_HEAVY_LOGGING(update_reason = "existing socket not yet sending");
}
// RcvCurrSeqNo is increased by one because it happens that at the
// synchronization moment it's already past reading and delivery.
// This is redundancy, so the redundant socket is connected at the moment
// when the other one is already transmitting, so skipping one packet
// even if later transmitted is less troublesome than requesting a
// "mistakenly seen as lost" packet.
w_rcv_isn = CSeqNo::incseq(se.m_iRcvCurrSeqNo);
HLOGC(gmlog.Debug,
log << "applyGroupSequences: @" << target << " gets seq from @" << gi->id << " rcv %" << (w_rcv_isn)
<< " snd %" << (w_snd_isn) << " as " << update_reason);
return false;
}
}
// If the GROUP (!) is not connected, or no running/pending socket has been found.
// // That is, given socket is the first one.
// The group data should be set up with its own data. They should already be passed here
// in the variables.
//
// Override the schedule sequence of the group in this case because whatever is set now,
// it's not valid.
HLOGC(gmlog.Debug,
log << "applyGroupSequences: no socket found connected and transmitting, @" << target
<< " not changing sequences, storing snd-seq %" << (w_snd_isn));
set_currentSchedSequence(w_snd_isn);
return true;
}
// NOTE: This function is now for DEBUG PURPOSES ONLY.
// Except for presenting the extracted data in the logs, there's no use of it now.
void CUDTGroup::debugMasterData(SRTSOCKET slave)
{
// Find at least one connection, which is running. Note that this function is called
// from within a handshake process, so the socket that undergoes this process is at best
// currently in SRT_GST_PENDING state and it's going to be in SRT_GST_IDLE state at the
// time when the connection process is done, until the first reading/writing happens.
ScopedLock cg(m_GroupLock);
IF_LOGGING(SRTSOCKET mpeer = SRT_INVALID_SOCK);
IF_LOGGING(steady_clock::time_point start_time);
bool found = false;
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->sndstate == SRT_GST_RUNNING)
{
// Found it. Get the socket's peer's ID and this socket's
// Start Time. Once it's delivered, this can be used to calculate
// the Master-to-Slave start time difference.
IF_LOGGING(mpeer = gi->ps->m_PeerID);
IF_LOGGING(start_time = gi->ps->core().socketStartTime());
HLOGC(gmlog.Debug,
log << "getMasterData: found RUNNING master @" << gi->id << " - reporting master's peer $" << mpeer
<< " starting at " << FormatTime(start_time));
found = true;
break;
}
}
if (!found)
{
// If no running one found, then take the first socket in any other
// state than broken, except the slave. This is for a case when a user
// has prepared one link already, but hasn't sent anything through it yet.
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->sndstate == SRT_GST_BROKEN)
continue;
if (gi->id == slave)
continue;
// Found it. Get the socket's peer's ID and this socket's
// Start Time. Once it's delivered, this can be used to calculate
// the Master-to-Slave start time difference.
IF_LOGGING(mpeer = gi->ps->core().m_PeerID);
IF_LOGGING(start_time = gi->ps->core().socketStartTime());
HLOGC(gmlog.Debug,
log << "getMasterData: found IDLE/PENDING master @" << gi->id << " - reporting master's peer $" << mpeer
<< " starting at " << FormatTime(start_time));
found = true;
break;
}
}
if (!found)
{
LOGC(cnlog.Debug, log << CONID() << "NO GROUP MASTER LINK found for group: $" << id());
}
else
{
// The returned master_st is the master's start time. Calculate the
// differene time.
IF_LOGGING(steady_clock::duration master_tdiff = m_tsStartTime - start_time);
LOGC(cnlog.Debug, log << CONID() << "FOUND GROUP MASTER LINK: peer=$" << mpeer
<< " - start time diff: " << FormatDuration<DUNIT_S>(master_tdiff));
}
}
// GROUP
CUDTGroup::SocketData* CUDTGroup::add(SocketData data)
{
ScopedLock g(m_GroupLock);
// Change the snd/rcv state of the group member to PENDING.
// Default for SocketData after creation is BROKEN, which just
// after releasing the m_GroupLock could be read and interpreted
// as broken connection and removed before the handshake process
// is done.
data.sndstate = SRT_GST_PENDING;
data.rcvstate = SRT_GST_PENDING;
LOGC(gmlog.Note, log << "group/add: adding member @" << data.id << " into group $" << id());
m_Group.push_back(data);
gli_t end = m_Group.end();
if (m_iMaxPayloadSize == -1)
{
int plsize = (int)data.ps->core().OPT_PayloadSize();
HLOGC(gmlog.Debug,
log << "CUDTGroup::add: taking MAX payload size from socket @" << data.ps->m_SocketID << ": " << plsize
<< " " << (plsize ? "(explicit)" : "(unspecified = fallback to 1456)"));
if (plsize == 0)
plsize = SRT_LIVE_MAX_PLSIZE;
// It is stated that the payload size
// is taken from first, and every next one
// will get the same.
m_iMaxPayloadSize = plsize;
}
--end;
return &*end;
}
CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype)
: m_Global(CUDT::uglobal())
, m_GroupID(-1)
, m_PeerGroupID(-1)
, m_type(gtype)
, m_listener()
, m_iBusy()
, m_iSndOldestMsgNo(SRT_MSGNO_NONE)
, m_iSndAckedMsgNo(SRT_MSGNO_NONE)
, m_uOPT_MinStabilityTimeout_us(1000 * CSrtConfig::COMM_DEF_MIN_STABILITY_TIMEOUT_MS)
// -1 = "undefined"; will become defined with first added socket
, m_iMaxPayloadSize(-1)
, m_iAvgPayloadSize(-1)
, m_bSynRecving(true)
, m_bSynSending(true)
, m_bTsbPd(true)
, m_bTLPktDrop(true)
, m_iTsbPdDelay_us(0)
// m_*EID and m_*Epolld fields will be initialized
// in the constructor body.
, m_iSndTimeOut(-1)
, m_iRcvTimeOut(-1)
, m_tsStartTime()
, m_tsRcvPeerStartTime()
, m_RcvBaseSeqNo(SRT_SEQNO_NONE)
, m_bOpened(false)
, m_bConnected(false)
, m_bClosing(false)
, m_iLastSchedSeqNo(SRT_SEQNO_NONE)
, m_iLastSchedMsgNo(SRT_MSGNO_NONE)
{
setupMutex(m_GroupLock, "Group");
setupMutex(m_RcvDataLock, "G/RcvData");
setupCond(m_RcvDataCond, "G/RcvData");
m_RcvEID = m_Global.m_EPoll.create(&m_RcvEpolld);
m_SndEID = m_Global.m_EPoll.create(&m_SndEpolld);
m_stats.init();
// Set this data immediately during creation before
// two or more sockets start arguing about it.
m_iLastSchedSeqNo = CUDT::generateISN();
}
CUDTGroup::~CUDTGroup()
{
srt_epoll_release(m_RcvEID);
srt_epoll_release(m_SndEID);
releaseMutex(m_GroupLock);
releaseMutex(m_RcvDataLock);
releaseCond(m_RcvDataCond);
}
void CUDTGroup::GroupContainer::erase(CUDTGroup::gli_t it)
{
if (it == m_LastActiveLink)
{
if (m_List.empty())
{
LOGC(gmlog.Error, log << "IPE: GroupContainer is empty and 'erase' is called on it.");
m_LastActiveLink = m_List.end();
return; // this avoids any misunderstandings in iterator checks
}
gli_t bb = m_List.begin();
++bb;
if (bb == m_List.end()) // means: m_List.size() == 1
{
// One element, this one being deleted, nothing to point to.
m_LastActiveLink = m_List.end();
}
else
{
// Set the link to the previous element IN THE RING.
// We have the position pointer.
// Reverse iterator is automatically decremented.
std::reverse_iterator<gli_t> rt(m_LastActiveLink);
if (rt == m_List.rend())
rt = m_List.rbegin();
m_LastActiveLink = rt.base();
// This operation is safe because we know that:
// - the size of the container is at least 2 (0 and 1 cases are handled above)
// - if m_LastActiveLink == m_List.begin(), `rt` is shifted to the opposite end.
--m_LastActiveLink;
}
}
m_List.erase(it);
--m_SizeCache;
}
void CUDTGroup::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen)
{
HLOGC(gmlog.Debug,
log << "GROUP $" << id() << " OPTION: #" << optName
<< " value:" << FormatBinaryString((uint8_t*)optval, optlen));
switch (optName)
{
case SRTO_RCVSYN:
m_bSynRecving = cast_optval<bool>(optval, optlen);
return;
case SRTO_SNDSYN:
m_bSynSending = cast_optval<bool>(optval, optlen);
return;
case SRTO_SNDTIMEO:
m_iSndTimeOut = cast_optval<int>(optval, optlen);
break;
case SRTO_RCVTIMEO:
m_iRcvTimeOut = cast_optval<int>(optval, optlen);
break;
case SRTO_GROUPMINSTABLETIMEO:
{
const int val_ms = cast_optval<int>(optval, optlen);
const int min_timeo_ms = (int) CSrtConfig::COMM_DEF_MIN_STABILITY_TIMEOUT_MS;
if (val_ms < min_timeo_ms)
{
LOGC(qmlog.Error,
log << "group option: SRTO_GROUPMINSTABLETIMEO min allowed value is " << min_timeo_ms << " ms.");
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
// Search if you already have SRTO_PEERIDLETIMEO set
int idletmo = CSrtConfig::COMM_RESPONSE_TIMEOUT_MS;
vector<ConfigItem>::iterator f =
find_if(m_config.begin(), m_config.end(), ConfigItem::OfType(SRTO_PEERIDLETIMEO));
if (f != m_config.end())
{
f->get(idletmo); // worst case, it will leave it unchanged.
}
if (val_ms > idletmo)
{
LOGC(qmlog.Error,
log << "group option: SRTO_GROUPMINSTABLETIMEO=" << val_ms << " exceeds SRTO_PEERIDLETIMEO=" << idletmo);
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
m_uOPT_MinStabilityTimeout_us = 1000 * val_ms;
}
break;
default:
break;
}
// All others must be simply stored for setting on a socket.
// If the group is already open and any post-option is about
// to be modified, it must be allowed and applied on all sockets.
if (m_bOpened)
{
// There's at least one socket in the group, so only
// post-options are allowed.
if (!binary_search(srt_post_opt_list, srt_post_opt_list + SRT_SOCKOPT_NPOST, optName))
{
LOGC(gmlog.Error, log << "setsockopt(group): Group is connected, this option can't be altered");
throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0);
}
HLOGC(gmlog.Debug, log << "... SPREADING to existing sockets.");
// This means that there are sockets already, so apply
// this option on them.
std::vector<CUDTSocket*> ps_vec;
{
// Do copy to avoid deadlock. CUDT::setOpt() cannot be called directly inside this loop, because
// CUDT::setOpt() will lock m_ConnectionLock, which should be locked before m_GroupLock.
ScopedLock gg(m_GroupLock);
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
ps_vec.push_back(gi->ps);
}
}
for (std::vector<CUDTSocket*>::iterator it = ps_vec.begin(); it != ps_vec.end(); ++it)
{
(*it)->core().setOpt(optName, optval, optlen);
}
}
// Store the option regardless if pre or post. This will apply
m_config.push_back(ConfigItem(optName, optval, optlen));
}
static bool getOptDefault(SRT_SOCKOPT optname, void* optval, int& w_optlen);
// unfortunately this is required to properly handle th 'default_opt != opt'
// operation in the below importOption. Not required simultaneously operator==.
static bool operator!=(const struct linger& l1, const struct linger& l2)
{
return l1.l_onoff != l2.l_onoff || l1.l_linger != l2.l_linger;
}
template <class ValueType>
static void importOption(vector<CUDTGroup::ConfigItem>& storage, SRT_SOCKOPT optname, const ValueType& field)
{
ValueType default_opt = ValueType();
static const int default_opt_size = sizeof(ValueType);
ValueType opt = field;
int opt_size = default_opt_size;
if (!getOptDefault(optname, (&default_opt), (opt_size)) || default_opt != opt)
{
SRT_ASSERT(opt_size == default_opt_size);
// Store the option when:
// - no default for this option is found
// - the option value retrieved from the field is different than default
storage.push_back(CUDTGroup::ConfigItem(optname, &opt, default_opt_size));
}
}
// This function is called by the same premises as the CUDT::CUDT(const CUDT&) (copy constructor).
// The intention is to rewrite the part that comprises settings from the socket
// into the group. Note that some of the settings concern group, some others concern
// only target socket, and there are also options that can't be set on a socket.
void CUDTGroup::deriveSettings(CUDT* u)
{
// !!! IMPORTANT !!!
//
// This function shall ONLY be called on a newly created group
// for the sake of the newly accepted socket from the group-enabled listener,
// which is lazy-created for the first ever accepted socket.
// Once the group is created, it should stay with the options
// state as initialized here, and be changeable only in case when
// the option is altered on the group.
// SRTO_RCVSYN
m_bSynRecving = u->m_config.bSynRecving;
// SRTO_SNDSYN
m_bSynSending = u->m_config.bSynSending;
// SRTO_RCVTIMEO
m_iRcvTimeOut = u->m_config.iRcvTimeOut;
// SRTO_SNDTIMEO
m_iSndTimeOut = u->m_config.iSndTimeOut;
// SRTO_GROUPMINSTABLETIMEO
m_uOPT_MinStabilityTimeout_us = 1000 * u->m_config.uMinStabilityTimeout_ms;
// Ok, this really is disgusting, but there's only one way
// to properly do it. Would be nice to have some more universal
// connection between an option symbolic name and the internals
// in CUDT class, but until this is done, since now every new
// option will have to be handled both in the CUDT::setOpt/getOpt
// functions, and here as well.
// This is about moving options from listener to the group,
// to be potentially replicated on the socket. So both pre
// and post options apply.
#define IM(option, field) importOption(m_config, option, u->m_config.field)
#define IMF(option, field) importOption(m_config, option, u->field)
IM(SRTO_MSS, iMSS);
IM(SRTO_FC, iFlightFlagSize);
// Nonstandard
importOption(m_config, SRTO_SNDBUF, u->m_config.iSndBufSize * (u->m_config.iMSS - CPacket::UDP_HDR_SIZE));
importOption(m_config, SRTO_RCVBUF, u->m_config.iRcvBufSize * (u->m_config.iMSS - CPacket::UDP_HDR_SIZE));
IM(SRTO_LINGER, Linger);
IM(SRTO_UDP_SNDBUF, iUDPSndBufSize);
IM(SRTO_UDP_RCVBUF, iUDPRcvBufSize);
// SRTO_RENDEZVOUS: impossible to have it set on a listener socket.
// SRTO_SNDTIMEO/RCVTIMEO: groupwise setting
// SRTO_CONNTIMEO requires a special handling, because API stores the value in integer milliseconds,
// but the type of the variable is srt::sync::duration.
importOption(m_config, SRTO_CONNTIMEO, (int) count_milliseconds(u->m_config.tdConnTimeOut));
IM(SRTO_DRIFTTRACER, bDriftTracer);
// Reuseaddr: true by default and should only be true.
IM(SRTO_MAXBW, llMaxBW);
IM(SRTO_INPUTBW, llInputBW);
IM(SRTO_MININPUTBW, llMinInputBW);
IM(SRTO_OHEADBW, iOverheadBW);
IM(SRTO_IPTOS, iIpToS);
IM(SRTO_IPTTL, iIpTTL);
IM(SRTO_TSBPDMODE, bTSBPD);
IM(SRTO_RCVLATENCY, iRcvLatency);
IM(SRTO_PEERLATENCY, iPeerLatency);
IM(SRTO_SNDDROPDELAY, iSndDropDelay);
// Special handling of SRTO_PAYLOADSIZE becuase API stores the value as int32_t,
// while the config structure stores it as size_t.
importOption(m_config, SRTO_PAYLOADSIZE, (int)u->m_config.zExpPayloadSize);
IMF(SRTO_TLPKTDROP, m_bTLPktDrop);
importOption(m_config, SRTO_STREAMID, u->m_config.sStreamName.str());
IM(SRTO_MESSAGEAPI, bMessageAPI);
IM(SRTO_NAKREPORT, bRcvNakReport);
IM(SRTO_MINVERSION, uMinimumPeerSrtVersion);
IM(SRTO_ENFORCEDENCRYPTION, bEnforcedEnc);
IM(SRTO_IPV6ONLY, iIpV6Only);
IM(SRTO_PEERIDLETIMEO, iPeerIdleTimeout_ms);
importOption(m_config, SRTO_PACKETFILTER, u->m_config.sPacketFilterConfig.str());
importOption(m_config, SRTO_PBKEYLEN, (int) u->m_pCryptoControl->KeyLen());
// Passphrase is empty by default. Decipher the passphrase and
// store as passphrase option
if (u->m_config.CryptoSecret.len)
{
string password((const char*)u->m_config.CryptoSecret.str, u->m_config.CryptoSecret.len);
m_config.push_back(ConfigItem(SRTO_PASSPHRASE, password.c_str(), (int)password.size()));
}
IM(SRTO_KMREFRESHRATE, uKmRefreshRatePkt);
IM(SRTO_KMPREANNOUNCE, uKmPreAnnouncePkt);
string cc = u->m_CongCtl.selected_name();
if (cc != "live")
{
m_config.push_back(ConfigItem(SRTO_CONGESTION, cc.c_str(), (int)cc.size()));
}
// NOTE: This is based on information extracted from the "semi-copy-constructor" of CUDT class.
// Here should be handled all things that are options that modify the socket, but not all options
// are assigned to configurable items.
#undef IM
#undef IMF
}
bool CUDTGroup::applyFlags(uint32_t flags, HandshakeSide)
{
const bool synconmsg = IsSet(flags, SRT_GFLAG_SYNCONMSG);
if (synconmsg)
{
LOGP(gmlog.Error, "GROUP: requested sync on msgno - not supported.");
return false;
}
return true;
}
template <class Type>
struct Value
{
static int fill(void* optval, int, Type value)
{
// XXX assert size >= sizeof(Type) ?
*(Type*)optval = value;
return sizeof(Type);
}
};
template <>
inline int Value<std::string>::fill(void* optval, int len, std::string value)
{
if (size_t(len) < value.size())
return 0;
memcpy(optval, value.c_str(), value.size());
return (int) value.size();
}
template <class V>
inline int fillValue(void* optval, int len, V value)
{
return Value<V>::fill(optval, len, value);
}
static bool getOptDefault(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen)
{
static const linger def_linger = {1, CSrtConfig::DEF_LINGER_S};
switch (optname)
{
default:
return false;
#define RD(value) \
w_optlen = fillValue((pw_optval), w_optlen, value); \
break
case SRTO_KMSTATE:
case SRTO_SNDKMSTATE:
case SRTO_RCVKMSTATE:
RD(SRT_KM_S_UNSECURED);
case SRTO_PBKEYLEN:
RD(16);
case SRTO_MSS:
RD(CSrtConfig::DEF_MSS);
case SRTO_SNDSYN:
RD(true);
case SRTO_RCVSYN:
RD(true);
case SRTO_ISN:
RD(SRT_SEQNO_NONE);
case SRTO_FC:
RD(CSrtConfig::DEF_FLIGHT_SIZE);
case SRTO_SNDBUF:
case SRTO_RCVBUF:
w_optlen = fillValue((pw_optval), w_optlen, CSrtConfig::DEF_BUFFER_SIZE * (CSrtConfig::DEF_MSS - CPacket::UDP_HDR_SIZE));
break;
case SRTO_LINGER:
RD(def_linger);
case SRTO_UDP_SNDBUF:
case SRTO_UDP_RCVBUF:
RD(CSrtConfig::DEF_UDP_BUFFER_SIZE);
case SRTO_RENDEZVOUS:
RD(false);
case SRTO_SNDTIMEO:
RD(-1);
case SRTO_RCVTIMEO:
RD(-1);
case SRTO_REUSEADDR:
RD(true);
case SRTO_MAXBW:
RD(int64_t(-1));
case SRTO_INPUTBW:
RD(int64_t(-1));
case SRTO_OHEADBW:
RD(0);
case SRTO_STATE:
RD(SRTS_INIT);
case SRTO_EVENT:
RD(0);
case SRTO_SNDDATA:
RD(0);
case SRTO_RCVDATA:
RD(0);
case SRTO_IPTTL:
RD(0);
case SRTO_IPTOS:
RD(0);
case SRTO_SENDER:
RD(false);
case SRTO_TSBPDMODE:
RD(false);
case SRTO_LATENCY:
case SRTO_RCVLATENCY:
case SRTO_PEERLATENCY:
RD(SRT_LIVE_DEF_LATENCY_MS);
case SRTO_TLPKTDROP:
RD(true);
case SRTO_SNDDROPDELAY:
RD(-1);
case SRTO_NAKREPORT:
RD(true);
case SRTO_VERSION:
RD(SRT_DEF_VERSION);
case SRTO_PEERVERSION:
RD(0);
case SRTO_CONNTIMEO:
RD(-1);
case SRTO_DRIFTTRACER:
RD(true);
case SRTO_MINVERSION:
RD(0);
case SRTO_STREAMID:
RD(std::string());
case SRTO_CONGESTION:
RD(std::string());
case SRTO_MESSAGEAPI:
RD(true);
case SRTO_PAYLOADSIZE:
RD(0);
case SRTO_GROUPMINSTABLETIMEO:
RD(CSrtConfig::COMM_DEF_MIN_STABILITY_TIMEOUT_MS);
}
#undef RD
return true;
}
struct FOptionValue
{
SRT_SOCKOPT expected;
FOptionValue(SRT_SOCKOPT v): expected(v) {}
bool operator()(const CUDTGroup::ConfigItem& i) const
{
return i.so == expected;
}
};
void CUDTGroup::getOpt(SRT_SOCKOPT optname, void* pw_optval, int& w_optlen)
{
// Options handled in group
switch (optname)
{
case SRTO_RCVSYN:
*(bool*)pw_optval = m_bSynRecving;
w_optlen = sizeof(bool);
return;
case SRTO_SNDSYN:
*(bool*)pw_optval = m_bSynSending;
w_optlen = sizeof(bool);
return;
case SRTO_SNDTIMEO:
*(int*)pw_optval = m_iSndTimeOut;
w_optlen = sizeof(int);
return;
case SRTO_RCVTIMEO:
*(int*)pw_optval = m_iRcvTimeOut;
w_optlen = sizeof(int);
return;
case SRTO_GROUPMINSTABLETIMEO:
*(uint32_t*)pw_optval = m_uOPT_MinStabilityTimeout_us / 1000;
w_optlen = sizeof(uint32_t);
return;
case SRTO_KMSTATE:
*(uint32_t*)pw_optval = getGroupEncryptionState();
w_optlen = sizeof(uint32_t);
return;
// Write-only options for security reasons or
// options that refer to a socket state, that
// makes no sense for a group.
case SRTO_PASSPHRASE:
case SRTO_KMPREANNOUNCE:
case SRTO_KMREFRESHRATE:
case SRTO_BINDTODEVICE:
case SRTO_GROUPCONNECT:
case SRTO_STATE:
case SRTO_EVENT:
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
default:; // pass on
}
bool is_set_on_socket = false;
{
// Can't have m_GroupLock locked while calling getOpt on a member socket
// because the call will acquire m_ControlLock leading to a lock-order-inversion.
enterCS(m_GroupLock);
gli_t gi = m_Group.begin();
CUDTSocket* const ps = (gi != m_Group.end()) ? gi->ps : NULL;
CUDTUnited::SocketKeeper sk(CUDT::uglobal(), ps);
leaveCS(m_GroupLock);
if (sk.socket)
{
// Return the value from the first member socket, if any is present
// Note: Will throw exception if the request is wrong.
sk.socket->core().getOpt(optname, (pw_optval), (w_optlen));
is_set_on_socket = true;
}
}
// Check if the option is in the storage, which means that
// it was modified on the group.
vector<ConfigItem>::const_iterator i = find_if(m_config.begin(), m_config.end(),
FOptionValue(optname));
if (i == m_config.end())
{
// Already written to the target variable.
if (is_set_on_socket)
return;
// Not found, see the defaults
if (!getOptDefault(optname, (pw_optval), (w_optlen)))
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
return;
}
// NOTE: even if is_set_on_socket, if it was also found in the group
// settings, overwrite with the value from the group.
// Found, return the value from the storage.
// Check the size first.
if (w_optlen < int(i->value.size()))
throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0);
w_optlen = (int) i->value.size();
memcpy((pw_optval), &i->value[0], i->value.size());
}
SRT_KM_STATE CUDTGroup::getGroupEncryptionState()
{
multiset<SRT_KM_STATE> kmstates;
{
ScopedLock lk (m_GroupLock);
// First check the container. If empty, return UNSECURED
if (m_Group.empty())
return SRT_KM_S_UNSECURED;
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
CCryptoControl* cc = gi->ps->core().m_pCryptoControl.get();
if (!cc)
continue;
SRT_KM_STATE gst = cc->m_RcvKmState;
// A fix to NOSECRET is because this is the state when agent has set
// no password, but peer did, and ENFORCEDENCRYPTION=false allowed
// this connection to be established. UNSECURED can't be taken in this
// case because this would suggest that BOTH are unsecured, that is,
// we have established an unsecured connection (which ain't true).
if (gst == SRT_KM_S_UNSECURED && cc->m_SndKmState == SRT_KM_S_NOSECRET)
gst = SRT_KM_S_NOSECRET;
kmstates.insert(gst);
}
}
// Criteria are:
// 1. UNSECURED, if no member sockets, or at least one UNSECURED found.
// 2. SECURED, if at least one SECURED found (cut off the previous criteria).
// 3. BADSECRET otherwise, although return NOSECRET if no BADSECRET is found.
if (kmstates.count(SRT_KM_S_UNSECURED))
return SRT_KM_S_UNSECURED;
// Now we have UNSECURED ruled out. Remaining may be NOSECRET, BADSECRET or SECURED.
// NOTE: SECURING is an intermediate state for HSv4 and can't occur in groups.
if (kmstates.count(SRT_KM_S_SECURED))
return SRT_KM_S_SECURED;
if (kmstates.count(SRT_KM_S_BADSECRET))
return SRT_KM_S_BADSECRET;
return SRT_KM_S_NOSECRET;
}
SRT_SOCKSTATUS CUDTGroup::getStatus()
{
typedef vector<pair<SRTSOCKET, SRT_SOCKSTATUS> > states_t;
states_t states;
{
ScopedLock cg(m_GroupLock);
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
switch (gi->sndstate)
{
// Check only sndstate. If this machine is ONLY receiving,
// then rcvstate will turn into SRT_GST_RUNNING, while
// sndstate will remain SRT_GST_IDLE, but still this may only
// happen if the socket is connected.
case SRT_GST_IDLE:
case SRT_GST_RUNNING:
states.push_back(make_pair(gi->id, SRTS_CONNECTED));
break;
case SRT_GST_BROKEN:
states.push_back(make_pair(gi->id, SRTS_BROKEN));
break;
default: // (pending, or whatever will be added in future)
{
// TEMPORARY make a node to note a socket to be checked afterwards
states.push_back(make_pair(gi->id, SRTS_NONEXIST));
}
}
}
}
SRT_SOCKSTATUS pending_state = SRTS_NONEXIST;
for (states_t::iterator i = states.begin(); i != states.end(); ++i)
{
// If at least one socket is connected, the state is connected.
if (i->second == SRTS_CONNECTED)
return SRTS_CONNECTED;
// Second level - pick up the state
if (i->second == SRTS_NONEXIST)
{
// Otherwise find at least one socket, which's state isn't broken.
i->second = m_Global.getStatus(i->first);
if (pending_state == SRTS_NONEXIST)
pending_state = i->second;
}
}
// Return that state as group state
if (pending_state != SRTS_NONEXIST) // did call getStatus at least once and it didn't return NOEXIST
return pending_state;
// If none found, return SRTS_BROKEN.
return SRTS_BROKEN;
}
// [[using locked(m_GroupLock)]];
void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
{
if (side == HSD_RESPONDER)
{
// On the listener side you should synchronize ISN with the incoming
// socket, which is done immediately after creating the socket and
// adding it to the group. On the caller side the ISN is defined in
// the group directly, before any member socket is created.
set_currentSchedSequence(core.ISN());
}
// Only set if was not initialized to avoid problems on a running connection.
if (m_RcvBaseSeqNo == SRT_SEQNO_NONE)
m_RcvBaseSeqNo = CSeqNo::decseq(core.m_iPeerISN);
// Get the latency (possibly fixed against the opposite side)
// from the first socket (core.m_iTsbPdDelay_ms),
// and set it on the current socket.
set_latency_us(core.m_iTsbPdDelay_ms * int64_t(1000));
}
void CUDTGroup::close()
{
// Close all descriptors, then delete the group.
vector<SRTSOCKET> ids;
{
ScopedLock glob(CUDT::uglobal().m_GlobControlLock);
ScopedLock g(m_GroupLock);
m_bClosing = true;
// Copy the list of IDs into the array.
for (gli_t ig = m_Group.begin(); ig != m_Group.end(); ++ig)
{
ids.push_back(ig->id);
// Immediately cut ties to this group.
// Just for a case, redispatch the socket, to stay safe.
CUDTSocket* s = CUDT::uglobal().locateSocket_LOCKED(ig->id);
if (!s)
{
HLOGC(smlog.Debug, log << "group/close: IPE(NF): group member @" << ig->id << " already deleted");
continue;
}
// Make the socket closing BEFORE withdrawing its group membership
// because a socket created as a group member cannot be valid
// without the group.
// This is not true in case of non-managed groups, which
// only collect sockets, but also non-managed groups should not
// use common group buffering and tsbpd. Also currently there are
// no other groups than managed one.
s->setClosing();
s->m_GroupOf = NULL;
s->m_GroupMemberData = NULL;
HLOGC(smlog.Debug, log << "group/close: CUTTING OFF @" << ig->id << " (found as @" << s->m_SocketID << ") from the group");
}
// After all sockets that were group members have their ties cut,
// the container can be cleared. Note that sockets won't be now
// removing themselves from the group when closing because they
// are unaware of being group members.
m_Group.clear();
m_PeerGroupID = -1;
set<int> epollid;
{
// Global EPOLL lock must be applied to access any socket's epoll set.
// This is a set of all epoll ids subscribed to it.
ScopedLock elock (CUDT::uglobal().m_EPoll.m_EPollLock);
epollid = m_sPollID; // use move() in C++11
m_sPollID.clear();
}
int no_events = 0;
for (set<int>::iterator i = epollid.begin(); i != epollid.end(); ++i)
{
HLOGC(smlog.Debug, log << "close: CLEARING subscription on E" << (*i) << " of $" << id());
try
{
CUDT::uglobal().m_EPoll.update_usock(*i, id(), &no_events);
}
catch (...)
{
// May catch an API exception, but this isn't an API call to be interrupted.
}
HLOGC(smlog.Debug, log << "close: removing E" << (*i) << " from back-subscribers of $" << id());
}
// NOW, the m_GroupLock is released, then m_GlobControlLock.
// The below code should work with no locks and execute socket
// closing.
}
HLOGC(gmlog.Debug, log << "grp/close: closing $" << m_GroupID << ", closing first " << ids.size() << " sockets:");
// Close all sockets with unlocked GroupLock
for (vector<SRTSOCKET>::iterator i = ids.begin(); i != ids.end(); ++i)
{
try
{
CUDT::uglobal().close(*i);
}
catch (CUDTException&)
{
HLOGC(gmlog.Debug, log << "grp/close: socket @" << *i << " is likely closed already, ignoring");
}
}
HLOGC(gmlog.Debug, log << "grp/close: closing $" << m_GroupID << ": sockets closed, clearing the group:");
// Lock the group again to clear the group data
{
ScopedLock g(m_GroupLock);
if (!m_Group.empty())
{
LOGC(gmlog.Error, log << "grp/close: IPE - after requesting to close all members, still " << m_Group.size()
<< " lingering members!");
m_Group.clear();
}
// This takes care of the internal part.
// The external part will be done in Global (CUDTUnited)
}
// Release blocked clients
// XXX This looks like a dead code. Group receiver functions
// do not use any lock on m_RcvDataLock, it is likely a remainder
// of the old, internal impementation.
// CSync::lock_notify_one(m_RcvDataCond, m_RcvDataLock);
}
// [[using locked(m_Global->m_GlobControlLock)]]
// [[using locked(m_GroupLock)]]
void CUDTGroup::send_CheckValidSockets()
{
vector<gli_t> toremove;
for (gli_t d = m_Group.begin(), d_next = d; d != m_Group.end(); d = d_next)
{
++d_next; // it's now safe to erase d
CUDTSocket* revps = m_Global.locateSocket_LOCKED(d->id);
if (revps != d->ps)
{
// Note: the socket might STILL EXIST, just in the trash, so
// it can't be found by locateSocket. But it can still be bound
// to the group. Just mark it broken from upside so that the
// internal sending procedures will skip it. Removal from the
// group will happen in GC, which will both remove from
// group container and cut backward links to the group.
HLOGC(gmlog.Debug, log << "group/send_CheckValidSockets: socket @" << d->id << " is no longer valid, setting BROKEN in $" << id());
d->sndstate = SRT_GST_BROKEN;
d->rcvstate = SRT_GST_BROKEN;
}
}
}
int CUDTGroup::send(const char* buf, int len, SRT_MSGCTRL& w_mc)
{
switch (m_type)
{
default:
LOGC(gslog.Error, log << "CUDTGroup::send: not implemented for type #" << m_type);
throw CUDTException(MJ_SETUP, MN_INVAL, 0);
case SRT_GTYPE_BROADCAST:
return sendBroadcast(buf, len, (w_mc));
case SRT_GTYPE_BACKUP:
return sendBackup(buf, len, (w_mc));
/* to be implemented
case SRT_GTYPE_BALANCING:
return sendBalancing(buf, len, (w_mc));
case SRT_GTYPE_MULTICAST:
return sendMulticast(buf, len, (w_mc));
*/
}
}
int CUDTGroup::sendBroadcast(const char* buf, int len, SRT_MSGCTRL& w_mc)
{
// Avoid stupid errors in the beginning.
if (len <= 0)
{
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
// NOTE: This is a "vector of list iterators". Every element here
// is an iterator to another container.
// Note that "list" is THE ONLY container in standard C++ library,
// for which NO ITERATORS ARE INVALIDATED after a node at particular
// iterator has been removed, except for that iterator itself.
vector<SRTSOCKET> wipeme;
vector<gli_t> idleLinks;
vector<SRTSOCKET> pendingSockets; // need sock ids as it will be checked out of lock
int32_t curseq = SRT_SEQNO_NONE; // The seqno of the first packet of this message.
int32_t nextseq = SRT_SEQNO_NONE; // The seqno of the first packet of next message.
int rstat = -1;
int stat = 0;
SRT_ATR_UNUSED CUDTException cx(MJ_SUCCESS, MN_NONE, 0);
vector<gli_t> activeLinks;
// First, acquire GlobControlLock to make sure all member sockets still exist
enterCS(m_Global.m_GlobControlLock);
ScopedLock guard(m_GroupLock);
if (m_bClosing)
{
leaveCS(m_Global.m_GlobControlLock);
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// Now, still under lock, check if all sockets still can be dispatched
// LOCKED: GlobControlLock, GroupLock (RIGHT ORDER!)
send_CheckValidSockets();
leaveCS(m_Global.m_GlobControlLock);
// LOCKED: GroupLock (only)
// Since this moment GlobControlLock may only be locked if GroupLock is unlocked first.
if (m_bClosing)
{
// No temporary locks here. The group lock is scoped.
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// This simply requires the payload to be sent through every socket in the group
for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d)
{
if (d->sndstate != SRT_GST_BROKEN)
{
// Check the socket state prematurely in order not to uselessly
// send over a socket that is broken.
CUDT* const pu = (d->ps)
? &d->ps->core()
: NULL;
if (!pu || pu->m_bBroken)
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: socket @" << d->id << " detected +Broken - transit to BROKEN");
d->sndstate = SRT_GST_BROKEN;
d->rcvstate = SRT_GST_BROKEN;
}
}
// Check socket sndstate before sending
if (d->sndstate == SRT_GST_BROKEN)
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: socket in BROKEN state: @" << d->id
<< ", sockstatus=" << SockStatusStr(d->ps ? d->ps->getStatus() : SRTS_NONEXIST));
wipeme.push_back(d->id);
continue;
}
if (d->sndstate == SRT_GST_IDLE)
{
SRT_SOCKSTATUS st = SRTS_NONEXIST;
if (d->ps)
st = d->ps->getStatus();
// If the socket is already broken, move it to broken.
if (int(st) >= int(SRTS_BROKEN))
{
HLOGC(gslog.Debug,
log << "CUDTGroup::send.$" << id() << ": @" << d->id << " became " << SockStatusStr(st)
<< ", WILL BE CLOSED.");
wipeme.push_back(d->id);
continue;
}
if (st != SRTS_CONNECTED)
{
HLOGC(gslog.Debug,
log << "CUDTGroup::send. @" << d->id << " is still " << SockStatusStr(st) << ", skipping.");
pendingSockets.push_back(d->id);
continue;
}
HLOGC(gslog.Debug, log << "grp/sendBroadcast: socket in IDLE state: @" << d->id << " - will activate it");
// This is idle, we'll take care of them next time
// Might be that:
// - this socket is idle, while some NEXT socket is running
// - we need at least one running socket to work BEFORE activating the idle one.
// - if ALL SOCKETS ARE IDLE, then we simply activate the first from the list,
// and all others will be activated using the ISN from the first one.
idleLinks.push_back(d);
continue;
}
if (d->sndstate == SRT_GST_RUNNING)
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: socket in RUNNING state: @" << d->id << " - will send a payload");
activeLinks.push_back(d);
continue;
}
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: socket @" << d->id << " not ready, state: " << StateStr(d->sndstate) << "("
<< int(d->sndstate) << ") - NOT sending, SET AS PENDING");
pendingSockets.push_back(d->id);
}
vector<Sendstate> sendstates;
if (w_mc.srctime == 0)
w_mc.srctime = count_microseconds(steady_clock::now().time_since_epoch());
for (vector<gli_t>::iterator snd = activeLinks.begin(); snd != activeLinks.end(); ++snd)
{
gli_t d = *snd;
int erc = 0; // success
// Remaining sndstate is SRT_GST_RUNNING. Send a payload through it.
try
{
// This must be wrapped in try-catch because on error it throws an exception.
// Possible return values are only 0, in case when len was passed 0, or a positive
// >0 value that defines the size of the data that it has sent, that is, in case
// of Live mode, equal to 'len'.
stat = d->ps->core().sendmsg2(buf, len, (w_mc));
}
catch (CUDTException& e)
{
cx = e;
stat = -1;
erc = e.getErrorCode();
}
if (stat != -1)
{
curseq = w_mc.pktseq;
nextseq = d->ps->core().schedSeqNo();
}
const Sendstate cstate = {d->id, &*d, stat, erc};
sendstates.push_back(cstate);
d->sndresult = stat;
d->laststatus = d->ps->getStatus();
}
// Ok, we have attempted to send a payload over all links
// that are currently in the RUNNING state. We know that at
// least one is successful if we have non-default curseq value.
// Here we need to activate all links that are found as IDLE.
// Some portion of logical exclusions:
//
// - sockets that were broken in the beginning are already wiped out
// - broken sockets are checked first, so they can't be simultaneously idle
// - idle sockets can't get broken because there's no operation done on them
// - running sockets are the only one that could change sndstate here
// - running sockets can either remain running or turn to broken
// In short: Running and Broken sockets can't become idle,
// although Running sockets can become Broken.
// There's no certainty here as to whether at least one link was
// running and it has successfully performed the operation.
// Might have even happened that we had 2 running links that
// got broken and 3 other links so far in idle sndstate that just connected
// at that very moment. In this case we have 3 idle links to activate,
// but there is no sequence base to overwrite their ISN with. If this
// happens, then the first link that should be activated goes with
// whatever ISN it has, whereas every next idle link should use that
// exactly ISN.
//
// If it has additionally happened that the first link got broken at
// that very moment of sending, the second one has a chance to succeed
// and therefore take over the leading role in setting the ISN. If the
// second one fails, too, then the only remaining idle link will simply
// go with its own original sequence.
// On the opposite side, if the first packet arriving looks like a jump over,
// the corresponding LOSSREPORT is sent. For packets that are truly lost,
// the sender retransmits them, for packets that before ISN, DROPREQ is sent.
// Now we can go to the idle links and attempt to send the payload
// also over them.
// TODO: { sendBroadcast_ActivateIdleLinks
for (vector<gli_t>::iterator i = idleLinks.begin(); i != idleLinks.end(); ++i)
{
gli_t d = *i;
if (!d->ps->m_GroupOf)
continue;
int erc = 0;
int lastseq = d->ps->core().schedSeqNo();
if (curseq != SRT_SEQNO_NONE && curseq != lastseq)
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: socket @" << d->id << ": override snd sequence %" << lastseq << " with %"
<< curseq << " (diff by " << CSeqNo::seqcmp(curseq, lastseq)
<< "); SENDING PAYLOAD: " << BufferStamp(buf, len));
d->ps->core().overrideSndSeqNo(curseq);
}
else
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: socket @" << d->id << ": sequence remains with original value: %"
<< lastseq << "; SENDING PAYLOAD " << BufferStamp(buf, len));
}
// Now send and check the status
// The link could have got broken
try
{
stat = d->ps->core().sendmsg2(buf, len, (w_mc));
}
catch (CUDTException& e)
{
cx = e;
stat = -1;
erc = e.getErrorCode();
}
if (stat != -1)
{
d->sndstate = SRT_GST_RUNNING;
// Note: this will override the sequence number
// for all next iterations in this loop.
curseq = w_mc.pktseq;
nextseq = d->ps->core().schedSeqNo();
HLOGC(gslog.Debug,
log << "@" << d->id << ":... sending SUCCESSFUL %" << curseq << " MEMBER STATUS: RUNNING");
}
d->sndresult = stat;
d->laststatus = d->ps->getStatus();
const Sendstate cstate = {d->id, &*d, stat, erc};
sendstates.push_back(cstate);
}
if (nextseq != SRT_SEQNO_NONE)
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: $" << id() << ": updating current scheduling sequence %" << nextseq);
m_iLastSchedSeqNo = nextseq;
}
// }
// { send_CheckBrokenSockets()
if (!pendingSockets.empty())
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: found pending sockets, polling them.");
// These sockets if they are in pending state, they should be added to m_SndEID
// at the connecting stage.
CEPoll::fmap_t sready;
if (m_Global.m_EPoll.empty(*m_SndEpolld))
{
// Sanity check - weird pending reported.
LOGC(gslog.Error,
log << "grp/sendBroadcast: IPE: reported pending sockets, but EID is empty - wiping pending!");
copy(pendingSockets.begin(), pendingSockets.end(), back_inserter(wipeme));
}
else
{
{
InvertedLock ug(m_GroupLock);
THREAD_PAUSED();
m_Global.m_EPoll.swait(
*m_SndEpolld, sready, 0, false /*report by retval*/); // Just check if anything happened
THREAD_RESUMED();
}
if (m_bClosing)
{
// No temporary locks here. The group lock is scoped.
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
HLOGC(gslog.Debug, log << "grp/sendBroadcast: RDY: " << DisplayEpollResults(sready));
// sockets in EX: should be moved to wipeme.
for (vector<SRTSOCKET>::iterator i = pendingSockets.begin(); i != pendingSockets.end(); ++i)
{
if (CEPoll::isready(sready, *i, SRT_EPOLL_ERR))
{
HLOGC(gslog.Debug,
log << "grp/sendBroadcast: Socket @" << (*i) << " reported FAILURE - moved to wiped.");
// Failed socket. Move d to wipeme. Remove from eid.
wipeme.push_back(*i);
int no_events = 0;
m_Global.m_EPoll.update_usock(m_SndEID, *i, &no_events);
}
}
// After that, all sockets that have been reported
// as ready to write should be removed from EID. This
// will also remove those sockets that have been added
// as redundant links at the connecting stage and became
// writable (connected) before this function had a chance
// to check them.
m_Global.m_EPoll.clear_ready_usocks(*m_SndEpolld, SRT_EPOLL_CONNECT);
}
}
// Re-check after the waiting lock has been reacquired
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
send_CloseBrokenSockets(wipeme);
// Re-check after the waiting lock has been reacquired
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
// }
// { sendBroadcast_CheckBlockedLinks()
// Alright, we've made an attempt to send a packet over every link.
// Every operation was done through a non-blocking attempt, so
// links where sending was blocked have SRT_EASYNCSND error.
// Links that were successful, have the len value in state.
// First thing then, find out if at least one link was successful.
// The first successful link sets the sequence value,
// the following links derive it. This might be also the first idle
// link with its random-generated ISN, if there were no active links.
vector<SocketData*> successful, blocked;
// This iteration of the state will simply
// qualify the remaining sockets into three categories:
//
// - successful (we only need to know if at least one did)
// - blocked - if none succeeded, but some blocked, POLL & RETRY.
// - wipeme - sending failed by any other reason than blocking, remove.
// Now - sendstates contain directly sockets.
// In order to update members, you need to have locked:
// - GlobControlLock to prevent sockets from disappearing or being closed
// - then GroupLock to latch the validity of m_GroupMemberData field.
{
{
InvertedLock ung (m_GroupLock);
enterCS(CUDT::uglobal().m_GlobControlLock);
HLOGC(gslog.Debug, log << "grp/sendBroadcast: Locked GlobControlLock, locking back GroupLock");
}
// Under this condition, as an unlock-lock cycle was done on m_GroupLock,
// the Sendstate::it field shall not be used here!
for (vector<Sendstate>::iterator is = sendstates.begin(); is != sendstates.end(); ++is)
{
CUDTSocket* ps = CUDT::uglobal().locateSocket_LOCKED(is->id);
// Is the socket valid? If not, simply SKIP IT. Nothing to be done with it,
// it's already deleted.
if (!ps)
continue;
// Is the socket still group member? If not, SKIP IT. It could only be taken ownership
// by being explicitly closed and so it's deleted from the container.
if (!ps->m_GroupOf)
continue;
// Now we are certain that m_GroupMemberData is valid.
SocketData* d = ps->m_GroupMemberData;
if (is->stat == len)
{
HLOGC(gslog.Debug,
log << "SEND STATE link [" << (is - sendstates.begin()) << "]: SUCCESSFULLY sent " << len
<< " bytes");
// Successful.
successful.push_back(d);
rstat = is->stat;
continue;
}
// Remaining are only failed. Check if again.
if (is->code == SRT_EASYNCSND)
{
blocked.push_back(d);
continue;
}
#if ENABLE_HEAVY_LOGGING
string errmsg = cx.getErrorString();
LOGC(gslog.Debug,
log << "SEND STATE link [" << (is - sendstates.begin()) << "]: FAILURE (result:" << is->stat
<< "): " << errmsg << ". Setting this socket broken status.");
#endif
// Turn this link broken
d->sndstate = SRT_GST_BROKEN;
}
// Now you can leave GlobControlLock, while GroupLock is still locked.
leaveCS(CUDT::uglobal().m_GlobControlLock);
}
// Re-check after the waiting lock has been reacquired
if (m_bClosing)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: GROUP CLOSED, ABANDONING");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// Good, now let's realize the situation.
// First, check the most optimistic scenario: at least one link succeeded.
bool was_blocked = false;
bool none_succeeded = false;
if (!successful.empty())
{
// Good. All blocked links are now qualified as broken.
// You had your chance, but I can't leave you here,
// there will be no further chance to reattempt sending.
for (vector<SocketData*>::iterator b = blocked.begin(); b != blocked.end(); ++b)
{
(*b)->sndstate = SRT_GST_BROKEN;
}
blocked.clear();
}
else
{
none_succeeded = true;
was_blocked = !blocked.empty();
}
int ercode = 0;
// This block causes waiting for any socket to accept the payload.
// This should be done only in blocking mode and only if no other socket
// accepted the payload.
if (was_blocked && none_succeeded && m_bSynSending)
{
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
if (!m_bSynSending)
{
throw CUDTException(MJ_AGAIN, MN_WRAVAIL, 0);
}
HLOGC(gslog.Debug, log << "grp/sendBroadcast: all blocked, trying to common-block on epoll...");
// XXX TO BE REMOVED. Sockets should be subscribed in m_SndEID at connecting time
// (both srt_connect and srt_accept).
// None was successful, but some were blocked. It means that we
// haven't sent the payload over any link so far, so we still have
// a chance to retry.
int modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
for (vector<SocketData*>::iterator b = blocked.begin(); b != blocked.end(); ++b)
{
HLOGC(gslog.Debug,
log << "Will block on blocked socket @" << (*b)->id << " as only blocked socket remained");
CUDT::uglobal().epoll_add_usock_INTERNAL(m_SndEID, (*b)->ps, &modes);
}
int blst = 0;
CEPoll::fmap_t sready;
{
// Lift the group lock for a while, to avoid possible deadlocks.
InvertedLock ug(m_GroupLock);
HLOGC(gslog.Debug, log << "grp/sendBroadcast: blocking on any of blocked sockets to allow sending");
// m_iSndTimeOut is -1 by default, which matches the meaning of waiting forever
THREAD_PAUSED();
blst = m_Global.m_EPoll.swait(*m_SndEpolld, sready, m_iSndTimeOut);
THREAD_RESUMED();
// NOTE EXCEPTIONS:
// - EEMPTY: won't happen, we have explicitly added sockets to EID here.
// - XTIMEOUT: will be propagated as this what should be reported to API
// This is the only reason why here the errors are allowed to be handled
// by exceptions.
}
// Re-check after the waiting lock has been reacquired
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
if (blst == -1)
{
int rno;
ercode = srt_getlasterror(&rno);
}
else
{
activeLinks.clear();
sendstates.clear();
// Extract gli's from the whole group that have id found in the array.
// LOCKING INFO:
// For the moment of lifting m_GroupLock, some sockets could have been closed.
// But then, we believe they have been also removed from the group container,
// and this requires locking on GroupLock. We can then stafely state that the
// group container contains only existing sockets, at worst broken.
for (gli_t dd = m_Group.begin(); dd != m_Group.end(); ++dd)
{
int rdev = CEPoll::ready(sready, dd->id);
if (rdev & SRT_EPOLL_ERR)
{
dd->sndstate = SRT_GST_BROKEN;
}
else if (rdev & SRT_EPOLL_OUT)
activeLinks.push_back(dd);
}
for (vector<gli_t>::iterator snd = activeLinks.begin(); snd != activeLinks.end(); ++snd)
{
gli_t d = *snd;
int erc = 0; // success
// Remaining sndstate is SRT_GST_RUNNING. Send a payload through it.
try
{
// This must be wrapped in try-catch because on error it throws an exception.
// Possible return values are only 0, in case when len was passed 0, or a positive
// >0 value that defines the size of the data that it has sent, that is, in case
// of Live mode, equal to 'len'.
stat = d->ps->core().sendmsg2(buf, len, (w_mc));
}
catch (CUDTException& e)
{
cx = e;
stat = -1;
erc = e.getErrorCode();
}
if (stat != -1)
curseq = w_mc.pktseq;
const Sendstate cstate = {d->id, &*d, stat, erc};
sendstates.push_back(cstate);
d->sndresult = stat;
d->laststatus = d->ps->getStatus();
}
// This time only check if any were successful.
// All others are wipeme.
// NOTE: m_GroupLock is continuously locked - you can safely use Sendstate::it field.
for (vector<Sendstate>::iterator is = sendstates.begin(); is != sendstates.end(); ++is)
{
if (is->stat == len)
{
// Successful.
successful.push_back(is->mb);
rstat = is->stat;
was_blocked = false;
none_succeeded = false;
continue;
}
#if ENABLE_HEAVY_LOGGING
string errmsg = cx.getErrorString();
HLOGC(gslog.Debug,
log << "... (repeat-waited) sending FAILED (" << errmsg
<< "). Setting this socket broken status.");
#endif
// Turn this link broken
is->mb->sndstate = SRT_GST_BROKEN;
}
}
}
// }
if (none_succeeded)
{
HLOGC(gslog.Debug, log << "grp/sendBroadcast: all links broken (none succeeded to send a payload)");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
// Reparse error code, if set.
// It might be set, if the last operation was failed.
// If any operation succeeded, this will not be executed anyway.
CodeMajor major = CodeMajor(ercode ? ercode / 1000 : MJ_CONNECTION);
CodeMinor minor = CodeMinor(ercode ? ercode % 1000 : MN_CONNLOST);
throw CUDTException(major, minor, 0);
}
for (vector<Sendstate>::iterator is = sendstates.begin(); is != sendstates.end(); ++is)
{
// Here we have a situation that at least 1 link successfully sent a packet.
// All links for which sending has failed must be closed.
if (is->stat == -1)
{
// This only sets the state to the socket; the GC process should
// pick it up at the next time.
HLOGC(gslog.Debug, log << "grp/sendBroadcast: per PARTIAL SUCCESS, closing failed @" << is->id);
is->mb->ps->setBrokenClosed();
}
}
// Now that at least one link has succeeded, update sending stats.
m_stats.sent.count(len);
// Pity that the blocking mode only determines as to whether this function should
// block or not, but the epoll flags must be updated regardless of the mode.
// Now fill in the socket table. Check if the size is enough, if not,
// then set the pointer to NULL and set the correct size.
// Note that list::size() is linear time, however this shouldn't matter,
// as with the increased number of links in the redundancy group the
// impossibility of using that many of them grows exponentally.
size_t grpsize = m_Group.size();
if (w_mc.grpdata_size < grpsize)
{
w_mc.grpdata = NULL;
}
size_t i = 0;
bool ready_again = false;
for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d, ++i)
{
if (w_mc.grpdata)
{
// Enough space to fill
copyGroupData(*d, (w_mc.grpdata[i]));
}
// We perform this loop anyway because we still need to check if any
// socket is writable. Note that the group lock will hold any write ready
// updates that are performed just after a single socket update for the
// group, so if any socket is actually ready at the moment when this
// is performed, and this one will result in none-write-ready, this will
// be fixed just after returning from this function.
ready_again = ready_again || d->ps->writeReady();
}
w_mc.grpdata_size = i;
if (!ready_again)
{
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
}
return rstat;
}
int CUDTGroup::getGroupData(SRT_SOCKGROUPDATA* pdata, size_t* psize)
{
if (!psize)
return CUDT::APIError(MJ_NOTSUP, MN_INVAL);
ScopedLock gl(m_GroupLock);
return getGroupData_LOCKED(pdata, psize);
}
// [[using locked(this->m_GroupLock)]]
int CUDTGroup::getGroupData_LOCKED(SRT_SOCKGROUPDATA* pdata, size_t* psize)
{
SRT_ASSERT(psize != NULL);
const size_t size = *psize;
// Rewrite correct size
*psize = m_Group.size();
if (!pdata)
{
return 0;
}
if (m_Group.size() > size)
{
// Not enough space to retrieve the data.
return CUDT::APIError(MJ_NOTSUP, MN_XSIZE);
}
size_t i = 0;
for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d, ++i)
{
copyGroupData(*d, (pdata[i]));
}
return (int)m_Group.size();
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::copyGroupData(const CUDTGroup::SocketData& source, SRT_SOCKGROUPDATA& w_target)
{
w_target.id = source.id;
memcpy((&w_target.peeraddr), &source.peer, source.peer.size());
w_target.sockstate = source.laststatus;
w_target.token = source.token;
// In the internal structure the member state
// is one per direction. From the user perspective
// however it is used either in one direction only,
// in which case the one direction that is active
// matters, or in both directions, in which case
// it will be always either both active or both idle.
if (source.sndstate == SRT_GST_RUNNING || source.rcvstate == SRT_GST_RUNNING)
{
w_target.result = 0;
w_target.memberstate = SRT_GST_RUNNING;
}
// Stats can differ per direction only
// when at least in one direction it's ACTIVE.
else if (source.sndstate == SRT_GST_BROKEN || source.rcvstate == SRT_GST_BROKEN)
{
w_target.result = -1;
w_target.memberstate = SRT_GST_BROKEN;
}
else
{
// IDLE or PENDING
w_target.result = 0;
w_target.memberstate = source.sndstate;
}
w_target.weight = source.weight;
}
void CUDTGroup::getGroupCount(size_t& w_size, bool& w_still_alive)
{
ScopedLock gg(m_GroupLock);
// Note: linear time, but no way to avoid it.
// Fortunately the size of the redundancy group is even
// in the craziest possible implementation at worst 4 members long.
size_t group_list_size = 0;
// In managed group, if all sockets made a failure, all
// were removed, so the loop won't even run once. In
// non-managed, simply no socket found here would have a
// connected status.
bool still_alive = false;
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->laststatus == SRTS_CONNECTED)
{
still_alive = true;
}
++group_list_size;
}
// If no socket is found connected, don't update any status.
w_size = group_list_size;
w_still_alive = still_alive;
}
// [[using locked(m_GroupLock)]]
void CUDTGroup::fillGroupData(SRT_MSGCTRL& w_out, // MSGCTRL to be written
const SRT_MSGCTRL& in // MSGCTRL read from the data-providing socket
)
{
// Preserve the data that will be overwritten by assignment
SRT_SOCKGROUPDATA* grpdata = w_out.grpdata;
size_t grpdata_size = w_out.grpdata_size;
w_out = in; // NOTE: This will write NULL to grpdata and 0 to grpdata_size!
w_out.grpdata = NULL; // Make sure it's done, for any case
w_out.grpdata_size = 0;
// User did not wish to read the group data at all.
if (!grpdata)
{
return;
}
int st = getGroupData_LOCKED((grpdata), (&grpdata_size));
// Always write back the size, no matter if the data were filled.
w_out.grpdata_size = grpdata_size;
if (st == SRT_ERROR)
{
// Keep NULL in grpdata
return;
}
// Write back original data
w_out.grpdata = grpdata;
}
// [[using locked(CUDT::uglobal()->m_GlobControLock)]]
// [[using locked(m_GroupLock)]]
struct FLookupSocketWithEvent_LOCKED
{
CUDTUnited* glob;
int evtype;
FLookupSocketWithEvent_LOCKED(CUDTUnited* g, int event_type)
: glob(g)
, evtype(event_type)
{
}
typedef CUDTSocket* result_type;
pair<CUDTSocket*, bool> operator()(const pair<SRTSOCKET, int>& es)
{
CUDTSocket* so = NULL;
if ((es.second & evtype) == 0)
return make_pair(so, false);
so = glob->locateSocket_LOCKED(es.first);
return make_pair(so, !!so);
}
};
void CUDTGroup::recv_CollectAliveAndBroken(vector<CUDTSocket*>& alive, set<CUDTSocket*>& broken)
{
#if ENABLE_HEAVY_LOGGING
std::ostringstream ds;
ds << "E(" << m_RcvEID << ") ";
#define HCLOG(expr) expr
#else
#define HCLOG(x) if (false) {}
#endif
alive.reserve(m_Group.size());
HLOGC(grlog.Debug, log << "group/recv: Reviewing member sockets for polling");
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->laststatus == SRTS_CONNECTING)
{
HCLOG(ds << "@" << gi->id << "<pending> ");
continue; // don't read over a failed or pending socket
}
if (gi->laststatus >= SRTS_BROKEN)
{
broken.insert(gi->ps);
}
if (broken.count(gi->ps))
{
HCLOG(ds << "@" << gi->id << "<broken> ");
continue;
}
if (gi->laststatus != SRTS_CONNECTED)
{
HCLOG(ds << "@" << gi->id << "<unstable:" << SockStatusStr(gi->laststatus) << "> ");
// Sockets in this state are ignored. We are waiting until it
// achieves CONNECTING state, then it's added to write.
// Or gets broken and closed in the next step.
continue;
}
// Don't skip packets that are ahead because if we have a situation
// that all links are either "elephants" (do not report read readiness)
// and "kangaroos" (have already delivered an ahead packet) then
// omitting kangaroos will result in only elephants to be polled for
// reading. Due to the strict timing requirements and ensurance that
// TSBPD on every link will result in exactly the same delivery time
// for a packet of given sequence, having an elephant and kangaroo in
// one cage means that the elephant is simply a broken or half-broken
// link (the data are not delivered, but it will get repaired soon,
// enough for SRT to maintain the connection, but it will still drop
// packets that didn't arrive in time), in both cases it may
// potentially block the reading for an indefinite time, while
// simultaneously a kangaroo might be a link that got some packets
// dropped, but then it's still capable to deliver packets on time.
// Note that gi->id might be a socket that was previously being polled
// on write, when it's attempting to connect, but now it's connected.
// This will update the socket with the new event set.
alive.push_back(gi->ps);
HCLOG(ds << "@" << gi->id << "[READ] ");
}
HLOGC(grlog.Debug, log << "group/recv: " << ds.str() << " --> EPOLL/SWAIT");
#undef HCLOG
}
vector<CUDTSocket*> CUDTGroup::recv_WaitForReadReady(const vector<CUDTSocket*>& aliveMembers, set<CUDTSocket*>& w_broken)
{
if (aliveMembers.empty())
{
LOGC(grlog.Error, log << "group/recv: all links broken");
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
}
for (vector<CUDTSocket*>::const_iterator i = aliveMembers.begin(); i != aliveMembers.end(); ++i)
{
// NOT using the official srt_epoll_add_usock because this will do socket dispatching,
// which requires lock on m_GlobControlLock, while this lock cannot be applied without
// first unlocking m_GroupLock.
const int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
CUDT::uglobal().epoll_add_usock_INTERNAL(m_RcvEID, *i, &read_modes);
}
// Here we need to make an additional check.
// There might be a possibility that all sockets that
// were added to the reader group, are ahead. At least
// surely we don't have a situation that any link contains
// an ahead-read subsequent packet, because GroupCheckPacketAhead
// already handled that case.
//
// What we can have is that every link has:
// - no known seq position yet (is not registered in the position map yet)
// - the position equal to the latest delivered sequence
// - the ahead position
// Now the situation is that we don't have any packets
// waiting for delivery so we need to wait for any to report one.
// The non-blocking mode would need to simply check the readiness
// with only immediate report, and read-readiness would have to
// be done in background.
// In blocking mode, use m_iRcvTimeOut, which's default value -1
// means to block indefinitely, also in swait().
// In non-blocking mode use 0, which means to always return immediately.
int timeout = m_bSynRecving ? m_iRcvTimeOut : 0;
int nready = 0;
// Poll on this descriptor until reading is available, indefinitely.
CEPoll::fmap_t sready;
// GlobControlLock is required for dispatching the sockets.
// Therefore it must be applied only when GroupLock is off.
{
// This call may wait indefinite time, so GroupLock must be unlocked.
InvertedLock ung (m_GroupLock);
THREAD_PAUSED();
nready = m_Global.m_EPoll.swait(*m_RcvEpolld, sready, timeout, false /*report by retval*/);
THREAD_RESUMED();
// HERE GlobControlLock is locked first, then GroupLock is applied back
enterCS(CUDT::uglobal().m_GlobControlLock);
}
// BOTH m_GlobControlLock AND m_GroupLock are locked here.
HLOGC(grlog.Debug, log << "group/recv: " << nready << " RDY: " << DisplayEpollResults(sready));
if (nready == 0)
{
// GlobControlLock is applied manually, so unlock manually.
// GroupLock will be unlocked as per scope.
leaveCS(CUDT::uglobal().m_GlobControlLock);
// This can only happen when 0 is passed as timeout and none is ready.
// And 0 is passed only in non-blocking mode. So this is none ready in
// non-blocking mode.
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
// Handle sockets of pending connection and with errors.
// Nice to have something like:
// broken = FilterIf(sready, [] (auto s)
// { return s.second == SRT_EPOLL_ERR && (auto cs = g->locateSocket(s.first, ERH_RETURN))
// ? {cs, true}
// : {nullptr, false}
// });
FilterIf(
/*FROM*/ sready.begin(),
sready.end(),
/*TO*/ std::inserter(w_broken, w_broken.begin()),
/*VIA*/ FLookupSocketWithEvent_LOCKED(&m_Global, SRT_EPOLL_ERR));
// If this set is empty, it won't roll even once, therefore output
// will be surely empty. This will be checked then same way as when
// reading from every socket resulted in error.
vector<CUDTSocket*> readReady;
readReady.reserve(aliveMembers.size());
for (vector<CUDTSocket*>::const_iterator sockiter = aliveMembers.begin(); sockiter != aliveMembers.end(); ++sockiter)
{
CUDTSocket* sock = *sockiter;
const CEPoll::fmap_t::const_iterator ready_iter = sready.find(sock->m_SocketID);
if (ready_iter != sready.end())
{
if (ready_iter->second & SRT_EPOLL_ERR)
continue; // broken already
if ((ready_iter->second & SRT_EPOLL_IN) == 0)
continue; // not ready for reading
readReady.push_back(*sockiter);
}
else
{
// No read-readiness reported by epoll, but can be missed or not yet handled
// while the receiver buffer is in fact read-ready.
ScopedLock lg(sock->core().m_RcvBufferLock);
if (!sock->core().m_pRcvBuffer)
continue;
// Checking for the next packet in the RCV buffer is safer that isReadReady(tnow).
const CRcvBuffer::PacketInfo info = sock->core().m_pRcvBuffer->getFirstValidPacketInfo();
if (info.seqno != SRT_SEQNO_NONE && !info.seq_gap)
readReady.push_back(sock);
}
}
leaveCS(CUDT::uglobal().m_GlobControlLock);
return readReady;
}
void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequence)
{
bool ready = false;
ScopedLock lg(m_GroupLock);
int seqdiff = 0;
if (m_RcvBaseSeqNo == SRT_SEQNO_NONE)
{
// One socket reported readiness, while no reading operation
// has ever been done. Whatever the sequence number is, it will
// be taken as a good deal and reading will be accepted.
ready = true;
}
else if ((seqdiff = CSeqNo::seqcmp(sequence, m_RcvBaseSeqNo)) > 0)
{
// Case diff == 1: The very next. Surely read-ready.
// Case diff > 1:
// We have an ahead packet. There's one strict condition in which
// we may believe it needs to be delivered - when KANGAROO->HORSE
// transition is allowed. Stating that the time calculation is done
// exactly the same way on every link in the redundancy group, when
// it came to a situation that a packet from one link is ready for
// extraction while it has jumped over some packet, it has surely
// happened due to TLPKTDROP, and if it happened on at least one link,
// we surely don't have this packet ready on any other link.
// This might prove not exactly true, especially when at the moment
// when this happens another link may surprisinly receive this lacking
// packet, so the situation gets suddenly repaired after this function
// is called, the only result of it would be that it will really get
// the very next sequence, even though this function doesn't know it
// yet, but surely in both cases the situation is the same: the medium
// is ready for reading, no matter what packet will turn out to be
// returned when reading is done.
ready = true;
}
// When the sequence number is behind the current one,
// stating that the readines wasn't checked otherwise, the reading
// function will not retrieve anything ready to read just by this premise.
// Even though this packet would have to be eventually extracted (and discarded).
if (ready)
{
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, true);
}
}
int32_t CUDTGroup::getRcvBaseSeqNo()
{
ScopedLock lg(m_GroupLock);
return m_RcvBaseSeqNo;
}
void CUDTGroup::updateWriteState()
{
ScopedLock lg(m_GroupLock);
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, true);
}
/// Validate iPktSeqno is in range
/// (iBaseSeqno - m_iSeqNoTH/2; iBaseSeqno + m_iSeqNoTH).
///
/// EXPECT_EQ(isValidSeqno(125, 124), true); // behind
/// EXPECT_EQ(isValidSeqno(125, 125), true); // behind
/// EXPECT_EQ(isValidSeqno(125, 126), true); // the next in order
///
/// EXPECT_EQ(isValidSeqno(0, 0x3FFFFFFF - 2), true); // ahead, but ok.
/// EXPECT_EQ(isValidSeqno(0, 0x3FFFFFFF - 1), false); // too far ahead.
/// EXPECT_EQ(isValidSeqno(0x3FFFFFFF + 2, 0x7FFFFFFF), false); // too far ahead.
/// EXPECT_EQ(isValidSeqno(0x3FFFFFFF + 3, 0x7FFFFFFF), true); // ahead, but ok.
/// EXPECT_EQ(isValidSeqno(0x3FFFFFFF, 0x1FFFFFFF + 2), false); // too far (behind)
/// EXPECT_EQ(isValidSeqno(0x3FFFFFFF, 0x1FFFFFFF + 3), true); // behind, but ok
/// EXPECT_EQ(isValidSeqno(0x70000000, 0x0FFFFFFF), true); // ahead, but ok
/// EXPECT_EQ(isValidSeqno(0x70000000, 0x30000000 - 2), false); // too far ahead.
/// EXPECT_EQ(isValidSeqno(0x70000000, 0x30000000 - 3), true); // ahead, but ok
/// EXPECT_EQ(isValidSeqno(0x0FFFFFFF, 0), true);
/// EXPECT_EQ(isValidSeqno(0x0FFFFFFF, 0x7FFFFFFF), true);
/// EXPECT_EQ(isValidSeqno(0x0FFFFFFF, 0x70000000), false);
/// EXPECT_EQ(isValidSeqno(0x0FFFFFFF, 0x70000001), false);
/// EXPECT_EQ(isValidSeqno(0x0FFFFFFF, 0x70000002), true); // behind by 536870910
/// EXPECT_EQ(isValidSeqno(0x0FFFFFFF, 0x70000003), true);
///
/// @return false if @a iPktSeqno is not inside the valid range; otherwise true.
static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno)
{
const int32_t iLenAhead = CSeqNo::seqlen(iBaseSeqno, iPktSeqno);
if (iLenAhead >= 0 && iLenAhead < CSeqNo::m_iSeqNoTH)
return true;
const int32_t iLenBehind = CSeqNo::seqlen(iPktSeqno, iBaseSeqno);
if (iLenBehind >= 0 && iLenBehind < CSeqNo::m_iSeqNoTH / 2)
return true;
return false;
}
int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
{
// First, acquire GlobControlLock to make sure all member sockets still exist
enterCS(m_Global.m_GlobControlLock);
ScopedLock guard(m_GroupLock);
if (m_bClosing)
{
// The group could be set closing in the meantime, but if
// this is only about to be set by another thread, this thread
// must fist wait for being able to acquire this lock.
// The group will not be deleted now because it is added usage counter
// by this call, but will be released once it exits.
leaveCS(m_Global.m_GlobControlLock);
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// Now, still under lock, check if all sockets still can be dispatched
send_CheckValidSockets();
leaveCS(m_Global.m_GlobControlLock);
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
// Later iteration over it might be less efficient than
// by vector, but we'll also often try to check a single id
// if it was ever seen broken, so that it's skipped.
set<CUDTSocket*> broken;
for (;;)
{
if (!m_bOpened || !m_bConnected)
{
LOGC(grlog.Error,
log << boolalpha << "grp/recv: $" << id() << ": ABANDONING: opened=" << m_bOpened
<< " connected=" << m_bConnected);
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
}
vector<CUDTSocket*> aliveMembers;
recv_CollectAliveAndBroken(aliveMembers, broken);
if (aliveMembers.empty())
{
LOGC(grlog.Error, log << "grp/recv: ALL LINKS BROKEN, ABANDONING.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
}
vector<CUDTSocket*> readySockets;
if (m_bSynRecving)
readySockets = recv_WaitForReadReady(aliveMembers, broken);
else
readySockets = aliveMembers;
if (m_bClosing)
{
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": GROUP CLOSED, ABANDONING.");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// Find the first readable packet among all member sockets.
steady_clock::time_point tnow = steady_clock::now();
CUDTSocket* socketToRead = NULL;
CRcvBuffer::PacketInfo infoToRead = {-1, false, time_point()};
for (vector<CUDTSocket*>::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si)
{
CUDTSocket* ps = *si;
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
// Drop here to make sure the getFirstReadablePacketInfo() below return fresher packet.
int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
if (cnt > 0)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
<< " packets before reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}
const CRcvBuffer::PacketInfo info =
ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(tnow);
if (info.seqno == SRT_SEQNO_NONE)
{
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read.");
continue;
}
// We need to qualify the sequence, just for a case.
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !isValidSeqno(m_RcvBaseSeqNo, info.seqno))
{
LOGC(grlog.Error,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": SEQUENCE DISCREPANCY: base=%"
<< m_RcvBaseSeqNo << " vs pkt=%" << info.seqno << ", setting ESECFAIL");
ps->core().m_bBroken = true;
broken.insert(ps);
continue;
}
if (socketToRead == NULL || CSeqNo::seqcmp(info.seqno, infoToRead.seqno) < 0)
{
socketToRead = ps;
infoToRead = info;
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && ((CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) == 1))
{
// We have the next packet. No need to check other read-ready sockets.
break;
}
}
}
if (socketToRead == NULL)
{
if (m_bSynRecving)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": No links reported any fresher packet, re-polling.");
continue;
}
else
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": No links reported any fresher packet, clearing readiness.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
}
else
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": Found first readable packet from @" << socketToRead->m_SocketID
<< ": seq=" << infoToRead.seqno << " gap=" << infoToRead.seq_gap
<< " time=" << FormatTime(infoToRead.tsbpd_time));
}
const int res = socketToRead->core().receiveMessage((buf), len, (w_mc), CUDTUnited::ERH_RETURN);
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Extracted data with %"
<< w_mc.pktseq << " #" << w_mc.msgno << ": " << (res <= 0 ? "(NOTHING)" : BufferStamp(buf, res)));
if (res == 0)
{
LOGC(grlog.Warn,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Retrying next socket...");
// This socket will not be socketToRead in the next turn because receiveMessage() return 0 here.
continue;
}
if (res == SRT_ERROR)
{
LOGC(grlog.Warn,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": " << srt_getlasterror_str()
<< ". Retrying next socket...");
broken.insert(socketToRead);
continue;
}
fillGroupData((w_mc), w_mc);
// m_RcvBaseSeqNo is expected to be set to the PeerISN with the first connected member,
// so a packet drop at the start should also be detected by this condition.
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
const int32_t iNumDropped = (CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) - 1;
if (iNumDropped > 0)
{
m_stats.recvDrop.count(stats::BytesPackets(iNumDropped * static_cast<uint64_t>(avgRcvPacketSize()), iNumDropped));
LOGC(grlog.Warn,
log << "@" << m_GroupID << " GROUP RCV-DROPPED " << iNumDropped << " packet(s): seqno %"
<< CSeqNo::incseq(m_RcvBaseSeqNo) << " to %" << CSeqNo::decseq(w_mc.pktseq));
}
}
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq);
m_RcvBaseSeqNo = w_mc.pktseq;
// Update stats as per delivery
m_stats.recv.count(res);
updateAvgPayloadSize(res);
bool canReadFurther = false;
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo), CUDT::DROP_DISCARD);
if (cnt > 0)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
<< " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}
if (!ps->core().isRcvBufferReadyNoLock())
m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false);
else
canReadFurther = true;
}
if (!canReadFurther)
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
return res;
}
LOGC(grlog.Error, log << "grp/recv: UNEXPECTED RUN PATH, ABANDONING.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
const char* CUDTGroup::StateStr(CUDTGroup::GroupState st)
{
static const char* const states[] = {"PENDING", "IDLE", "RUNNING", "BROKEN"};
static const size_t size = Size(states);
static const char* const unknown = "UNKNOWN";
if (size_t(st) < size)
return states[st];
return unknown;
}
void CUDTGroup::synchronizeDrift(const srt::CUDT* srcMember)
{
SRT_ASSERT(srcMember != NULL);
ScopedLock glock(m_GroupLock);
if (m_Group.size() <= 1)
{
HLOGC(grlog.Debug, log << "GROUP: synch uDRIFT NOT DONE, no other links");
return;
}
steady_clock::time_point timebase;
steady_clock::duration udrift(0);
bool wrap_period = false;
srcMember->m_pRcvBuffer->getInternalTimeBase((timebase), (wrap_period), (udrift));
HLOGC(grlog.Debug,
log << "GROUP: synch uDRIFT=" << FormatDuration(udrift) << " TB=" << FormatTime(timebase) << "("
<< (wrap_period ? "" : "NO ") << "wrap period)");
// Now that we have the minimum timebase and drift calculated, apply this to every link,
// INCLUDING THE REPORTER.
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
// Skip non-connected; these will be synchronized when ready
if (gi->laststatus != SRTS_CONNECTED)
continue;
CUDT& member = gi->ps->core();
if (srcMember == &member)
continue;
member.m_pRcvBuffer->applyGroupDrift(timebase, wrap_period, udrift);
}
}
void CUDTGroup::bstatsSocket(CBytePerfMon* perf, bool clear)
{
if (!m_bConnected)
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
const steady_clock::time_point currtime = steady_clock::now();
// NOTE: Potentially in the group we might be using both IPv4 and IPv6
// links and sending a single packet over these two links could be different.
// These stats then don't make much sense in this form, this has to be
// redesigned. We use the header size as per IPv4, as it was everywhere.
const int pktHdrSize = CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE;
memset(perf, 0, sizeof *perf);
ScopedLock gg(m_GroupLock);
perf->msTimeStamp = count_milliseconds(currtime - m_tsStartTime);
perf->pktSentUnique = m_stats.sent.trace.count();
perf->pktRecvUnique = m_stats.recv.trace.count();
perf->pktRcvDrop = m_stats.recvDrop.trace.count();
perf->byteSentUnique = m_stats.sent.trace.bytesWithHdr(pktHdrSize);
perf->byteRecvUnique = m_stats.recv.trace.bytesWithHdr(pktHdrSize);
perf->byteRcvDrop = m_stats.recvDrop.trace.bytesWithHdr(pktHdrSize);
perf->pktSentUniqueTotal = m_stats.sent.total.count();
perf->pktRecvUniqueTotal = m_stats.recv.total.count();
perf->pktRcvDropTotal = m_stats.recvDrop.total.count();
perf->byteSentUniqueTotal = m_stats.sent.total.bytesWithHdr(pktHdrSize);
perf->byteRecvUniqueTotal = m_stats.recv.total.bytesWithHdr(pktHdrSize);
perf->byteRcvDropTotal = m_stats.recvDrop.total.bytesWithHdr(pktHdrSize);
const double interval = static_cast<double>(count_microseconds(currtime - m_stats.tsLastSampleTime));
perf->mbpsSendRate = double(perf->byteSent) * 8.0 / interval;
perf->mbpsRecvRate = double(perf->byteRecv) * 8.0 / interval;
if (clear)
{
m_stats.reset();
}
}
/// @brief Compares group members by their weight (higher weight comes first).
struct FCompareByWeight
{
typedef CUDTGroup::gli_t gli_t;
/// @returns true if the first argument is less than (i.e. is ordered before) the second.
bool operator()(const gli_t preceding, const gli_t succeeding)
{
return preceding->weight > succeeding->weight;
}
};
// [[using maybe_locked(this->m_GroupLock)]]
BackupMemberState CUDTGroup::sendBackup_QualifyIfStandBy(const gli_t d)
{
if (!d->ps)
return BKUPST_BROKEN;
const SRT_SOCKSTATUS st = d->ps->getStatus();
// If the socket is already broken, move it to broken.
if (int(st) >= int(SRTS_BROKEN))
{
HLOGC(gslog.Debug,
log << "CUDTGroup::send.$" << id() << ": @" << d->id << " became " << SockStatusStr(st)
<< ", WILL BE CLOSED.");
return BKUPST_BROKEN;
}
if (st != SRTS_CONNECTED)
{
HLOGC(gslog.Debug, log << "CUDTGroup::send. @" << d->id << " is still " << SockStatusStr(st) << ", skipping.");
return BKUPST_PENDING;
}
return BKUPST_STANDBY;
}
// [[using maybe_locked(this->m_GroupLock)]]
bool CUDTGroup::send_CheckIdle(const gli_t d, vector<SRTSOCKET>& w_wipeme, vector<SRTSOCKET>& w_pendingSockets)
{
SRT_SOCKSTATUS st = SRTS_NONEXIST;
if (d->ps)
st = d->ps->getStatus();
// If the socket is already broken, move it to broken.
if (int(st) >= int(SRTS_BROKEN))
{
HLOGC(gslog.Debug,
log << "CUDTGroup::send.$" << id() << ": @" << d->id << " became " << SockStatusStr(st)
<< ", WILL BE CLOSED.");
w_wipeme.push_back(d->id);
return false;
}
if (st != SRTS_CONNECTED)
{
HLOGC(gslog.Debug, log << "CUDTGroup::send. @" << d->id << " is still " << SockStatusStr(st) << ", skipping.");
w_pendingSockets.push_back(d->id);
return false;
}
return true;
}
#if SRT_DEBUG_BONDING_STATES
class StabilityTracer
{
public:
StabilityTracer()
{
}
~StabilityTracer()
{
srt::sync::ScopedLock lck(m_mtx);
m_fout.close();
}
void trace(const CUDT& u, const srt::sync::steady_clock::time_point& currtime, uint32_t activation_period_us,
int64_t stability_tmo_us, const std::string& state, uint16_t weight)
{
srt::sync::ScopedLock lck(m_mtx);
create_file();
m_fout << srt::sync::FormatTime(currtime) << ",";
m_fout << u.id() << ",";
m_fout << weight << ",";
m_fout << u.peerLatency_us() << ",";
m_fout << u.SRTT() << ",";
m_fout << u.RTTVar() << ",";
m_fout << stability_tmo_us << ",";
m_fout << count_microseconds(currtime - u.lastRspTime()) << ",";
m_fout << state << ",";
m_fout << (srt::sync::is_zero(u.freshActivationStart()) ? -1 : (count_microseconds(currtime - u.freshActivationStart()))) << ",";
m_fout << activation_period_us << "\n";
m_fout.flush();
}
private:
void print_header()
{
//srt::sync::ScopedLock lck(m_mtx);
m_fout << "Timepoint,SocketID,weight,usLatency,usRTT,usRTTVar,usStabilityTimeout,usSinceLastResp,State,usSinceActivation,usActivationPeriod\n";
}
void create_file()
{
if (m_fout.is_open())
return;
std::string str_tnow = srt::sync::FormatTimeSys(srt::sync::steady_clock::now());
str_tnow.resize(str_tnow.size() - 7); // remove trailing ' [SYST]' part
while (str_tnow.find(':') != std::string::npos) {
str_tnow.replace(str_tnow.find(':'), 1, 1, '_');
}
const std::string fname = "stability_trace_" + str_tnow + ".csv";
m_fout.open(fname.c_str(), std::ofstream::out);
if (!m_fout)
std::cerr << "IPE: Failed to open " << fname << "!!!\n";
print_header();
}
private:
srt::sync::Mutex m_mtx;
std::ofstream m_fout;
};
StabilityTracer s_stab_trace;
#endif
void CUDTGroup::sendBackup_QualifyMemberStates(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime)
{
// First, check status of every link - no matter if idle or active.
for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d)
{
if (d->sndstate != SRT_GST_BROKEN)
{
// Check the socket state prematurely in order not to uselessly
// send over a socket that is broken.
CUDT* const pu = (d->ps)
? &d->ps->core()
: NULL;
if (!pu || pu->m_bBroken)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: socket @" << d->id << " detected +Broken - transit to BROKEN");
d->sndstate = SRT_GST_BROKEN;
d->rcvstate = SRT_GST_BROKEN;
}
}
// Check socket sndstate before sending
if (d->sndstate == SRT_GST_BROKEN)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: socket in BROKEN state: @" << d->id
<< ", sockstatus=" << SockStatusStr(d->ps ? d->ps->getStatus() : SRTS_NONEXIST));
sendBackup_AssignBackupState(d->ps->core(), BKUPST_BROKEN, currtime);
w_sendBackupCtx.recordMemberState(&(*d), BKUPST_BROKEN);
#if SRT_DEBUG_BONDING_STATES
s_stab_trace.trace(d->ps->core(), currtime, 0, 0, stateToStr(BKUPST_BROKEN), d->weight);
#endif
continue;
}
if (d->sndstate == SRT_GST_IDLE)
{
const BackupMemberState idle_state = sendBackup_QualifyIfStandBy(d);
sendBackup_AssignBackupState(d->ps->core(), idle_state, currtime);
w_sendBackupCtx.recordMemberState(&(*d), idle_state);
if (idle_state == BKUPST_STANDBY)
{
// TODO: Check if this is some abandoned logic.
sendBackup_CheckIdleTime(d);
}
#if SRT_DEBUG_BONDING_STATES
s_stab_trace.trace(d->ps->core(), currtime, 0, 0, stateToStr(idle_state), d->weight);
#endif
continue;
}
if (d->sndstate == SRT_GST_RUNNING)
{
const BackupMemberState active_state = sendBackup_QualifyActiveState(d, currtime);
sendBackup_AssignBackupState(d->ps->core(), active_state, currtime);
w_sendBackupCtx.recordMemberState(&(*d), active_state);
#if SRT_DEBUG_BONDING_STATES
s_stab_trace.trace(d->ps->core(), currtime, 0, 0, stateToStr(active_state), d->weight);
#endif
continue;
}
HLOGC(gslog.Debug,
log << "grp/sendBackup: socket @" << d->id << " not ready, state: " << StateStr(d->sndstate) << "("
<< int(d->sndstate) << ") - NOT sending, SET AS PENDING");
// Otherwise connection pending
sendBackup_AssignBackupState(d->ps->core(), BKUPST_PENDING, currtime);
w_sendBackupCtx.recordMemberState(&(*d), BKUPST_PENDING);
#if SRT_DEBUG_BONDING_STATES
s_stab_trace.trace(d->ps->core(), currtime, 0, 0, stateToStr(BKUPST_PENDING), d->weight);
#endif
}
}
void CUDTGroup::sendBackup_AssignBackupState(CUDT& sock, BackupMemberState state, const steady_clock::time_point& currtime)
{
switch (state)
{
case BKUPST_PENDING:
case BKUPST_STANDBY:
case BKUPST_BROKEN:
sock.m_tsFreshActivation = steady_clock::time_point();
sock.m_tsUnstableSince = steady_clock::time_point();
sock.m_tsWarySince = steady_clock::time_point();
break;
case BKUPST_ACTIVE_FRESH:
if (is_zero(sock.freshActivationStart()))
{
sock.m_tsFreshActivation = currtime;
}
sock.m_tsUnstableSince = steady_clock::time_point();
sock.m_tsWarySince = steady_clock::time_point();;
break;
case BKUPST_ACTIVE_STABLE:
sock.m_tsFreshActivation = steady_clock::time_point();
sock.m_tsUnstableSince = steady_clock::time_point();
sock.m_tsWarySince = steady_clock::time_point();
break;
case BKUPST_ACTIVE_UNSTABLE:
if (is_zero(sock.m_tsUnstableSince))
{
sock.m_tsUnstableSince = currtime;
}
sock.m_tsFreshActivation = steady_clock::time_point();
sock.m_tsWarySince = steady_clock::time_point();
break;
case BKUPST_ACTIVE_UNSTABLE_WARY:
if (is_zero(sock.m_tsWarySince))
{
sock.m_tsWarySince = currtime;
}
break;
default:
break;
}
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::sendBackup_CheckIdleTime(gli_t w_d)
{
// Check if it was fresh set as idle, we had to wait until its sender
// buffer gets empty so that we can make sure that KEEPALIVE will be the
// really last sent for longer time.
CUDT& u = w_d->ps->core();
if (is_zero(u.m_tsFreshActivation)) // TODO: Check if this condition is ever false
return;
CSndBuffer* b = u.m_pSndBuffer;
if (b && b->getCurrBufSize() == 0)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: FRESH IDLE LINK reached empty buffer - setting permanent and KEEPALIVE");
u.m_tsFreshActivation = steady_clock::time_point();
// Send first immediate keepalive. The link is to be turn to IDLE
// now so nothing will be sent to it over time and it will start
// getting KEEPALIVES since now. Send the first one now to increase
// probability that the link will be recognized as IDLE on the
// reception side ASAP.
int32_t arg = 1;
w_d->ps->core().sendCtrl(UMSG_KEEPALIVE, &arg);
}
}
// [[using locked(this->m_GroupLock)]]
CUDTGroup::BackupMemberState CUDTGroup::sendBackup_QualifyActiveState(const gli_t d, const time_point currtime)
{
const CUDT& u = d->ps->core();
const uint32_t latency_us = u.peerLatency_us();
const int32_t min_stability_us = m_uOPT_MinStabilityTimeout_us;
const int64_t initial_stabtout_us = max<int64_t>(min_stability_us, latency_us);
const int64_t probing_period_us = initial_stabtout_us + 5 * CUDT::COMM_SYN_INTERVAL_US;
// RTT and RTTVar values are still being refined during the probing period,
// therefore the dymanic timeout should not be used during the probing period.
const bool is_activation_phase = !is_zero(u.freshActivationStart())
&& (count_microseconds(currtime - u.freshActivationStart()) <= probing_period_us);
// Initial stability timeout is used only in activation phase.
// Otherwise runtime stability is used, including the WARY state.
const int64_t stability_tout_us = is_activation_phase
? initial_stabtout_us // activation phase
: min<int64_t>(max<int64_t>(min_stability_us, 2 * u.SRTT() + 4 * u.RTTVar()), latency_us);
const steady_clock::time_point last_rsp = max(u.freshActivationStart(), u.lastRspTime());
const steady_clock::duration td_response = currtime - last_rsp;
// No response for a long time
if (count_microseconds(td_response) > stability_tout_us)
{
return BKUPST_ACTIVE_UNSTABLE;
}
enterCS(u.m_StatsLock);
const int64_t drop_total = u.m_stats.sndr.dropped.total.count();
leaveCS(u.m_StatsLock);
const bool have_new_drops = d->pktSndDropTotal != drop_total;
if (have_new_drops)
{
d->pktSndDropTotal = drop_total;
if (!is_activation_phase)
return BKUPST_ACTIVE_UNSTABLE;
}
// Responsive: either stable, wary or still fresh activated.
if (is_activation_phase)
return BKUPST_ACTIVE_FRESH;
const bool is_wary = !is_zero(u.m_tsWarySince);
const bool is_wary_probing = is_wary
&& (count_microseconds(currtime - u.m_tsWarySince) <= 4 * u.peerLatency_us());
const bool is_unstable = !is_zero(u.m_tsUnstableSince);
// If unstable and not in wary, become wary.
if (is_unstable && !is_wary)
return BKUPST_ACTIVE_UNSTABLE_WARY;
// Still probing for stability.
if (is_wary_probing)
return BKUPST_ACTIVE_UNSTABLE_WARY;
if (is_wary)
{
LOGC(gslog.Debug,
log << "grp/sendBackup: @" << u.id() << " wary->stable after " << count_milliseconds(currtime - u.m_tsWarySince) << " ms");
}
return BKUPST_ACTIVE_STABLE;
}
// [[using locked(this->m_GroupLock)]]
bool CUDTGroup::sendBackup_CheckSendStatus(const steady_clock::time_point& currtime SRT_ATR_UNUSED,
const int send_status,
const int32_t lastseq,
const int32_t pktseq,
CUDT& w_u,
int32_t& w_curseq,
int& w_final_stat)
{
if (send_status == -1)
return false; // Sending failed.
bool send_succeeded = false;
if (w_curseq == SRT_SEQNO_NONE)
{
w_curseq = pktseq;
}
else if (w_curseq != lastseq)
{
// We believe that all active links use the same seq.
// But we can do some sanity check.
LOGC(gslog.Error,
log << "grp/sendBackup: @" << w_u.m_SocketID << ": IPE: another running link seq discrepancy: %"
<< lastseq << " vs. previous %" << w_curseq << " - fixing");
// Override must be done with a sequence number greater by one.
// Example:
//
// Link 1 before sending: curr=1114, next=1115
// After sending it reports pktseq=1115
//
// Link 2 before sending: curr=1110, next=1111 (->lastseq before sending)
// THIS CHECK done after sending:
// -- w_curseq(1115) != lastseq(1111)
//
// NOW: Link 1 after sending is:
// curr=1115, next=1116
//
// The value of w_curseq here = 1115, while overrideSndSeqNo
// calls setInitialSndSeq(seq), which sets:
// - curr = seq - 1
// - next = seq
//
// So, in order to set curr=1115, next=1116
// this must set to 1115+1.
w_u.overrideSndSeqNo(CSeqNo::incseq(w_curseq));
}
// State it as succeeded, though. We don't know if the link
// is broken until we get the connection broken confirmation,
// and the instability state may wear off next time.
send_succeeded = true;
w_final_stat = send_status;
return send_succeeded;
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::sendBackup_Buffering(const char* buf, const int len, int32_t& w_curseq, SRT_MSGCTRL& w_mc)
{
// This is required to rewrite into currentSchedSequence() property
// as this value will be used as ISN when a new link is connected.
int32_t oldest_buffer_seq = SRT_SEQNO_NONE;
if (w_curseq != SRT_SEQNO_NONE)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: successfully sent over running link, ADDING TO BUFFER.");
// Note: the sequence number that was used to send this packet should be
// recorded here.
oldest_buffer_seq = addMessageToBuffer(buf, len, (w_mc));
}
else
{
// We have to predict, which sequence number would have
// to be placed on the packet about to be sent now. To
// maintain consistency:
// 1. If there are any packets in the sender buffer,
// get the sequence of the last packet, increase it.
// This must be done even if this contradicts the ISN
// of all idle links because otherwise packets will get
// discrepancy.
if (!m_SenderBuffer.empty())
{
BufferedMessage& m = m_SenderBuffer.back();
w_curseq = CSeqNo::incseq(m.mc.pktseq);
// Set also this sequence to the current w_mc
w_mc.pktseq = w_curseq;
// XXX may need tighter revision when message mode is allowed
w_mc.msgno = ++MsgNo(m.mc.msgno);
oldest_buffer_seq = addMessageToBuffer(buf, len, (w_mc));
}
// Note that if buffer is empty and w_curseq is (still) SRT_SEQNO_NONE,
// it will have to try to send first in order to extract the data.
// Note that if w_curseq is still SRT_SEQNO_NONE at this point, it means
// that we have the case of the very first packet sending.
// Otherwise there would be something in the buffer already.
}
if (oldest_buffer_seq != SRT_SEQNO_NONE)
m_iLastSchedSeqNo = oldest_buffer_seq;
}
size_t CUDTGroup::sendBackup_TryActivateStandbyIfNeeded(
const char* buf,
const int len,
bool& w_none_succeeded,
SRT_MSGCTRL& w_mc,
int32_t& w_curseq,
int32_t& w_final_stat,
SendBackupCtx& w_sendBackupCtx,
CUDTException& w_cx,
const steady_clock::time_point& currtime)
{
const unsigned num_standby = w_sendBackupCtx.countMembersByState(BKUPST_STANDBY);
if (num_standby == 0)
{
return 0;
}
const unsigned num_stable = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_STABLE);
const unsigned num_fresh = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_FRESH);
if (num_stable + num_fresh == 0)
{
LOGC(gslog.Warn,
log << "grp/sendBackup: trying to activate a stand-by link (" << num_standby << " available). "
<< "Reason: no stable links"
);
}
else if (w_sendBackupCtx.maxActiveWeight() < w_sendBackupCtx.maxStandbyWeight())
{
LOGC(gslog.Warn,
log << "grp/sendBackup: trying to activate a stand-by link (" << num_standby << " available). "
<< "Reason: max active weight " << w_sendBackupCtx.maxActiveWeight()
<< ", max stand by weight " << w_sendBackupCtx.maxStandbyWeight()
);
}
else
{
/*LOGC(gslog.Warn,
log << "grp/sendBackup: no need to activate (" << num_standby << " available). "
<< "Max active weight " << w_sendBackupCtx.maxActiveWeight()
<< ", max stand by weight " << w_sendBackupCtx.maxStandbyWeight()
);*/
return 0;
}
int stat = -1;
size_t num_activated = 0;
w_sendBackupCtx.sortByWeightAndState();
typedef vector<BackupMemberStateEntry>::const_iterator const_iter_t;
for (const_iter_t member = w_sendBackupCtx.memberStates().begin(); member != w_sendBackupCtx.memberStates().end(); ++member)
{
if (member->state != BKUPST_STANDBY)
continue;
int erc = 0;
SocketData* d = member->pSocketData;
// Now send and check the status
// The link could have got broken
try
{
CUDT& cudt = d->ps->core();
// Take source rate estimation from an active member (needed for the input rate estimation mode).
cudt.setRateEstimator(w_sendBackupCtx.getRateEstimate());
// TODO: At this point all packets that could be sent
// are located in m_SenderBuffer. So maybe just use sendBackupRexmit()?
if (w_curseq == SRT_SEQNO_NONE)
{
// This marks the fact that the given here packet
// could not be sent over any link. This includes the
// situation of sending the very first packet after connection.
HLOGC(gslog.Debug,
log << "grp/sendBackup: ... trying @" << d->id << " - sending the VERY FIRST message");
stat = cudt.sendmsg2(buf, len, (w_mc));
if (stat != -1)
{
// This will be no longer used, but let it stay here.
// It's because if this is successful, no other links
// will be tried.
w_curseq = w_mc.pktseq;
addMessageToBuffer(buf, len, (w_mc));
}
}
else
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: ... trying @" << d->id << " - resending " << m_SenderBuffer.size()
<< " collected messages...");
// Note: this will set the currently required packet
// because it has been just freshly added to the sender buffer
stat = sendBackupRexmit(cudt, (w_mc));
}
++num_activated;
}
catch (CUDTException& e)
{
// This will be propagated from internal sendmsg2 call,
// but that's ok - we want this sending interrupted even in half.
w_cx = e;
stat = -1;
erc = e.getErrorCode();
}
d->sndresult = stat;
d->laststatus = d->ps->getStatus();
if (stat != -1)
{
d->sndstate = SRT_GST_RUNNING;
sendBackup_AssignBackupState(d->ps->core(), BKUPST_ACTIVE_FRESH, currtime);
w_sendBackupCtx.updateMemberState(d, BKUPST_ACTIVE_FRESH);
// Note: this will override the sequence number
// for all next iterations in this loop.
w_none_succeeded = false;
w_final_stat = stat;
LOGC(gslog.Warn,
log << "@" << d->id << " FRESH-ACTIVATED");
// We've activated the link, so that's enough.
break;
}
// Failure - move to broken those that could not be activated
bool isblocked SRT_ATR_UNUSED = true;
if (erc != SRT_EASYNCSND)
{
isblocked = false;
sendBackup_AssignBackupState(d->ps->core(), BKUPST_BROKEN, currtime);
w_sendBackupCtx.updateMemberState(d, BKUPST_BROKEN);
}
// If we found a blocked link, leave it alone, however
// still try to send something over another link
LOGC(gslog.Warn,
log << "@" << d->id << " FAILED (" << (isblocked ? "blocked" : "ERROR")
<< "), trying to activate another link.");
}
return num_activated;
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::sendBackup_CheckPendingSockets(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime)
{
if (w_sendBackupCtx.countMembersByState(BKUPST_PENDING) == 0)
return;
HLOGC(gslog.Debug, log << "grp/send*: checking pending sockets.");
// These sockets if they are in pending state, should be added to m_SndEID
// at the connecting stage.
CEPoll::fmap_t sready;
if (m_Global.m_EPoll.empty(*m_SndEpolld))
{
// Sanity check - weird pending reported.
LOGC(gslog.Error, log << "grp/send*: IPE: reported pending sockets, but EID is empty - wiping pending!");
return;
}
{
InvertedLock ug(m_GroupLock);
m_Global.m_EPoll.swait(
*m_SndEpolld, sready, 0, false /*report by retval*/); // Just check if anything has happened
}
if (m_bClosing)
{
HLOGC(gslog.Debug, log << "grp/send...: GROUP CLOSED, ABANDONING");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// Some sockets could have been closed in the meantime.
if (m_Global.m_EPoll.empty(*m_SndEpolld))
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
HLOGC(gslog.Debug, log << "grp/send*: RDY: " << DisplayEpollResults(sready));
typedef vector<BackupMemberStateEntry>::const_iterator const_iter_t;
for (const_iter_t member = w_sendBackupCtx.memberStates().begin(); member != w_sendBackupCtx.memberStates().end(); ++member)
{
if (member->state != BKUPST_PENDING)
continue;
const SRTSOCKET sockid = member->pSocketData->id;
if (!CEPoll::isready(sready, sockid, SRT_EPOLL_ERR))
continue;
HLOGC(gslog.Debug, log << "grp/send*: Socket @" << sockid << " reported FAILURE - qualifying as broken.");
w_sendBackupCtx.updateMemberState(member->pSocketData, BKUPST_BROKEN);
if (member->pSocketData->ps)
sendBackup_AssignBackupState(member->pSocketData->ps->core(), BKUPST_BROKEN, currtime);
const int no_events = 0;
m_Global.m_EPoll.update_usock(m_SndEID, sockid, &no_events);
}
// After that, all sockets that have been reported
// as ready to write should be removed from EID. This
// will also remove those sockets that have been added
// as redundant links at the connecting stage and became
// writable (connected) before this function had a chance
// to check them.
m_Global.m_EPoll.clear_ready_usocks(*m_SndEpolld, SRT_EPOLL_OUT);
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::sendBackup_CheckUnstableSockets(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime)
{
const unsigned num_stable = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_STABLE);
if (num_stable == 0)
return;
const unsigned num_unstable = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_UNSTABLE);
const unsigned num_wary = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_UNSTABLE_WARY);
if (num_unstable + num_wary == 0)
return;
HLOGC(gslog.Debug, log << "grp/send*: checking unstable sockets.");
typedef vector<BackupMemberStateEntry>::const_iterator const_iter_t;
for (const_iter_t member = w_sendBackupCtx.memberStates().begin(); member != w_sendBackupCtx.memberStates().end(); ++member)
{
if (member->state != BKUPST_ACTIVE_UNSTABLE && member->state != BKUPST_ACTIVE_UNSTABLE_WARY)
continue;
CUDT& sock = member->pSocketData->ps->core();
if (is_zero(sock.m_tsUnstableSince))
{
LOGC(gslog.Error, log << "grp/send* IPE: Socket @" << member->socketID
<< " is qualified as unstable, but does not have the 'unstable since' timestamp. Still marking for closure.");
}
const int unstable_for_ms = (int)count_milliseconds(currtime - sock.m_tsUnstableSince);
if (unstable_for_ms < sock.peerIdleTimeout_ms())
continue;
// Requesting this socket to be broken with the next CUDT::checkExpTimer() call.
sock.breakAsUnstable();
LOGC(gslog.Warn, log << "grp/send*: Socket @" << member->socketID << " is unstable for " << unstable_for_ms
<< "ms - requesting breakage.");
//w_sendBackupCtx.updateMemberState(member->pSocketData, BKUPST_BROKEN);
//if (member->pSocketData->ps)
// sendBackup_AssignBackupState(member->pSocketData->ps->core(), BKUPST_BROKEN, currtime);
}
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::send_CloseBrokenSockets(vector<SRTSOCKET>& w_wipeme)
{
if (!w_wipeme.empty())
{
InvertedLock ug(m_GroupLock);
// With unlocked GroupLock, we can now lock GlobControlLock.
// This is needed to prevent any of them deleted from the container
// at the same time.
ScopedLock globlock(CUDT::uglobal().m_GlobControlLock);
for (vector<SRTSOCKET>::iterator p = w_wipeme.begin(); p != w_wipeme.end(); ++p)
{
CUDTSocket* s = CUDT::uglobal().locateSocket_LOCKED(*p);
// If the socket has been just moved to ClosedSockets, it means that
// the object still exists, but it will be no longer findable.
if (!s)
continue;
HLOGC(gslog.Debug,
log << "grp/send...: BROKEN SOCKET @" << (*p) << " - CLOSING, to be removed from group.");
// As per sending, make it also broken so that scheduled
// packets will be also abandoned.
s->setClosed();
}
}
HLOGC(gslog.Debug, log << "grp/send...: - wiped " << w_wipeme.size() << " broken sockets");
// We'll need you again.
w_wipeme.clear();
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::sendBackup_CloseBrokenSockets(SendBackupCtx& w_sendBackupCtx)
{
if (w_sendBackupCtx.countMembersByState(BKUPST_BROKEN) == 0)
return;
InvertedLock ug(m_GroupLock);
// With unlocked GroupLock, we can now lock GlobControlLock.
// This is needed prevent any of them be deleted from the container
// at the same time.
ScopedLock globlock(CUDT::uglobal().m_GlobControlLock);
typedef vector<BackupMemberStateEntry>::const_iterator const_iter_t;
for (const_iter_t member = w_sendBackupCtx.memberStates().begin(); member != w_sendBackupCtx.memberStates().end(); ++member)
{
if (member->state != BKUPST_BROKEN)
continue;
// m_GroupLock is unlocked, therefore member->pSocketData can't be used.
const SRTSOCKET sockid = member->socketID;
CUDTSocket* s = CUDT::uglobal().locateSocket_LOCKED(sockid);
// If the socket has been just moved to ClosedSockets, it means that
// the object still exists, but it will be no longer findable.
if (!s)
continue;
LOGC(gslog.Debug,
log << "grp/send...: BROKEN SOCKET @" << sockid << " - CLOSING, to be removed from group.");
// As per sending, make it also broken so that scheduled
// packets will be also abandoned.
s->setBrokenClosed();
}
// TODO: all broken members are to be removed from the context now???
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::sendBackup_RetryWaitBlocked(SendBackupCtx& w_sendBackupCtx,
int& w_final_stat,
bool& w_none_succeeded,
SRT_MSGCTRL& w_mc,
CUDTException& w_cx)
{
// In contradiction to broadcast sending, backup sending must check
// the blocking state in total first. We need this information through
// epoll because we didn't use all sockets to send the data hence the
// blocked socket information would not be complete.
// Don't do this check if sending has succeeded over at least one
// stable link. This procedure is to wait for at least one write-ready
// link.
//
// If sending succeeded also over at least one unstable link (you only have
// unstable links and none other or others just got broken), continue sending
// anyway.
// This procedure is for a case when the packet could not be sent
// over any link (hence "none succeeded"), but there are some unstable
// links and no parallel links. We need to WAIT for any of the links
// to become available for sending.
// Note: A link is added in unstableLinks if sending has failed with SRT_ESYNCSND.
const unsigned num_unstable = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_UNSTABLE);
const unsigned num_wary = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_UNSTABLE_WARY);
if ((num_unstable + num_wary == 0) || !w_none_succeeded)
return;
HLOGC(gslog.Debug, log << "grp/sendBackup: no successfull sending: "
<< (num_unstable + num_wary) << " unstable links - waiting to retry sending...");
// Note: GroupLock is set already, skip locks and checks
getGroupData_LOCKED((w_mc.grpdata), (&w_mc.grpdata_size));
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
if (m_Global.m_EPoll.empty(*m_SndEpolld))
{
// wipeme wiped, pending sockets checked, it can only mean that
// all sockets are broken.
HLOGC(gslog.Debug, log << "grp/sendBackup: epolld empty - all sockets broken?");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
if (!m_bSynSending)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: non-blocking mode - exit with no-write-ready");
throw CUDTException(MJ_AGAIN, MN_WRAVAIL, 0);
}
// Here is the situation that the only links left here are:
// - those that failed to send (already closed and wiped out)
// - those that got blockade on sending
// At least, there was so far no socket through which we could
// successfully send anything.
// As a last resort in this situation, try to wait for any links
// remaining in the group to become ready to write.
CEPoll::fmap_t sready;
int brdy;
// This keeps the number of links that existed at the entry.
// Simply notify all dead links, regardless as to whether the number
// of group members decreases below. If the number of corpses reaches
// this number, consider the group connection broken.
const size_t nlinks = m_Group.size();
size_t ndead = 0;
RetryWaitBlocked:
{
// Some sockets could have been closed in the meantime.
if (m_Global.m_EPoll.empty(*m_SndEpolld))
{
HLOGC(gslog.Debug, log << "grp/sendBackup: no more sockets available for sending - group broken");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
InvertedLock ug(m_GroupLock);
HLOGC(gslog.Debug,
log << "grp/sendBackup: swait call to get at least one link alive up to " << m_iSndTimeOut << "us");
THREAD_PAUSED();
brdy = m_Global.m_EPoll.swait(*m_SndEpolld, (sready), m_iSndTimeOut);
THREAD_RESUMED();
if (brdy == 0) // SND timeout exceeded
{
throw CUDTException(MJ_AGAIN, MN_WRAVAIL, 0);
}
HLOGC(gslog.Debug, log << "grp/sendBackup: swait exited with " << brdy << " ready sockets:");
// Check if there's anything in the "error" section.
// This must be cleared here before the lock on group is set again.
// (This loop will not fire neither once if no failed sockets found).
for (CEPoll::fmap_t::const_iterator i = sready.begin(); i != sready.end(); ++i)
{
if (i->second & SRT_EPOLL_ERR)
{
SRTSOCKET id = i->first;
CUDTSocket* s = m_Global.locateSocket(id, CUDTUnited::ERH_RETURN); // << LOCKS m_GlobControlLock!
if (s)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: swait/ex on @" << (id)
<< " while waiting for any writable socket - CLOSING");
CUDT::uglobal().close(s); // << LOCKS m_GlobControlLock, then GroupLock!
}
else
{
HLOGC(gslog.Debug, log << "grp/sendBackup: swait/ex on @" << (id) << " - WAS DELETED IN THE MEANTIME");
}
++ndead;
}
}
HLOGC(gslog.Debug, log << "grp/sendBackup: swait/?close done, re-acquiring GroupLock");
}
// GroupLock is locked back
// Re-check after the waiting lock has been reacquired
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
if (brdy == -1 || ndead >= nlinks)
{
LOGC(gslog.Error,
log << "grp/sendBackup: swait=>" << brdy << " nlinks=" << nlinks << " ndead=" << ndead
<< " - looxlike all links broken");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
// You can safely throw here - nothing to fill in when all sockets down.
// (timeout was reported by exception in the swait call).
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// Ok, now check if we have at least one write-ready.
// Note that the procedure of activation of a new link in case of
// no stable links found embraces also rexmit-sending and status
// check as well, including blocked status.
// Find which one it was. This is so rare case that we can
// suffer linear search.
int nwaiting = 0;
int nactivated SRT_ATR_UNUSED = 0;
int stat = -1;
for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d)
{
// We are waiting only for active members
if (d->sndstate != SRT_GST_RUNNING)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: member @" << d->id << " state is not RUNNING - SKIPPING from retry/waiting");
continue;
}
// Skip if not writable in this run
if (!CEPoll::isready(sready, d->id, SRT_EPOLL_OUT))
{
++nwaiting;
HLOGC(gslog.Debug, log << "grp/sendBackup: @" << d->id << " NOT ready:OUT, added as waiting");
continue;
}
try
{
// Note: this will set the currently required packet
// because it has been just freshly added to the sender buffer
stat = sendBackupRexmit(d->ps->core(), (w_mc));
++nactivated;
}
catch (CUDTException& e)
{
// This will be propagated from internal sendmsg2 call,
// but that's ok - we want this sending interrupted even in half.
w_cx = e;
stat = -1;
}
d->sndresult = stat;
d->laststatus = d->ps->getStatus();
if (stat == -1)
{
// This link is no longer waiting.
continue;
}
w_final_stat = stat;
d->sndstate = SRT_GST_RUNNING;
w_none_succeeded = false;
const steady_clock::time_point currtime = steady_clock::now();
sendBackup_AssignBackupState(d->ps->core(), BKUPST_ACTIVE_UNSTABLE_WARY, currtime);
w_sendBackupCtx.updateMemberState(&(*d), BKUPST_ACTIVE_UNSTABLE_WARY);
HLOGC(gslog.Debug, log << "grp/sendBackup: after waiting, ACTIVATED link @" << d->id);
break;
}
// If we have no links successfully activated, but at least
// one link "not ready for writing", continue waiting for at
// least one link ready.
if (stat == -1 && nwaiting > 0)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: still have " << nwaiting << " waiting and none succeeded, REPEAT");
goto RetryWaitBlocked;
}
}
// [[using locked(this->m_GroupLock)]]
void CUDTGroup::sendBackup_SilenceRedundantLinks(SendBackupCtx& w_sendBackupCtx, const steady_clock::time_point& currtime)
{
// The most important principle is to keep the data being sent constantly,
// even if it means temporarily full redundancy.
// A member can be silenced only if there is at least one stable memebr.
const unsigned num_stable = w_sendBackupCtx.countMembersByState(BKUPST_ACTIVE_STABLE);
if (num_stable == 0)
return;
// INPUT NEEDED:
// - stable member with maximum weight
uint16_t max_weight_stable = 0;
SRTSOCKET stableSocketId = SRT_INVALID_SOCK; // SocketID of a stable link with higher weight
w_sendBackupCtx.sortByWeightAndState();
//LOGC(gslog.Debug, log << "grp/silenceRedundant: links after sort: " << w_sendBackupCtx.printMembers());
typedef vector<BackupMemberStateEntry>::const_iterator const_iter_t;
for (const_iter_t member = w_sendBackupCtx.memberStates().begin(); member != w_sendBackupCtx.memberStates().end(); ++member)
{
if (!isStateActive(member->state))
continue;
const bool haveHigherWeightStable = stableSocketId != SRT_INVALID_SOCK;
const uint16_t weight = member->pSocketData->weight;
if (member->state == BKUPST_ACTIVE_STABLE)
{
// silence stable link if it is not the first stable
if (!haveHigherWeightStable)
{
max_weight_stable = (int) weight;
stableSocketId = member->socketID;
continue;
}
else
{
LOGC(gslog.Note, log << "grp/sendBackup: silencing stable member @" << member->socketID << " (weight " << weight
<< ") in favor of @" << stableSocketId << " (weight " << max_weight_stable << ")");
}
}
else if (haveHigherWeightStable && weight <= max_weight_stable)
{
LOGC(gslog.Note, log << "grp/sendBackup: silencing member @" << member->socketID << " (weight " << weight
<< " " << stateToStr(member->state)
<< ") in favor of @" << stableSocketId << " (weight " << max_weight_stable << ")");
}
else
{
continue;
}
// TODO: Move to a separate function sendBackup_SilenceMember
SocketData* d = member->pSocketData;
CUDT& u = d->ps->core();
sendBackup_AssignBackupState(u, BKUPST_STANDBY, currtime);
w_sendBackupCtx.updateMemberState(d, BKUPST_STANDBY);
if (d->sndstate != SRT_GST_RUNNING)
{
LOGC(gslog.Error,
log << "grp/sendBackup: IPE: misidentified a non-running link @" << d->id << " as active");
continue;
}
d->sndstate = SRT_GST_IDLE;
}
}
int CUDTGroup::sendBackup(const char* buf, int len, SRT_MSGCTRL& w_mc)
{
if (len <= 0)
{
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
// Only live streaming is supported
if (len > SRT_LIVE_MAX_PLSIZE)
{
LOGC(gslog.Error, log << "grp/send(backup): buffer size=" << len << " exceeds maximum allowed in live mode");
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
// [[using assert(this->m_pSndBuffer != nullptr)]];
// First, acquire GlobControlLock to make sure all member sockets still exist
enterCS(m_Global.m_GlobControlLock);
ScopedLock guard(m_GroupLock);
if (m_bClosing)
{
leaveCS(m_Global.m_GlobControlLock);
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// Now, still under lock, check if all sockets still can be dispatched
send_CheckValidSockets();
leaveCS(m_Global.m_GlobControlLock);
steady_clock::time_point currtime = steady_clock::now();
SendBackupCtx sendBackupCtx; // default initialized as empty
// TODO: reserve? sendBackupCtx.memberStates.reserve(m_Group.size());
sendBackup_QualifyMemberStates((sendBackupCtx), currtime);
int32_t curseq = SRT_SEQNO_NONE;
size_t nsuccessful = 0;
SRT_ATR_UNUSED CUDTException cx(MJ_SUCCESS, MN_NONE, 0); // TODO: Delete then?
uint16_t maxActiveWeight = 0; // Maximum weight of active links.
// The number of bytes sent or -1 for error will be stored in group_send_result
int group_send_result = sendBackup_SendOverActive(buf, len, w_mc, currtime, (curseq), (nsuccessful), (maxActiveWeight), (sendBackupCtx), (cx));
bool none_succeeded = (nsuccessful == 0);
// Save current payload in group's sender buffer.
sendBackup_Buffering(buf, len, (curseq), (w_mc));
sendBackup_TryActivateStandbyIfNeeded(buf, len, (none_succeeded),
(w_mc),
(curseq),
(group_send_result),
(sendBackupCtx),
(cx), currtime);
sendBackup_CheckPendingSockets((sendBackupCtx), currtime);
sendBackup_CheckUnstableSockets((sendBackupCtx), currtime);
//LOGC(gslog.Debug, log << "grp/sendBackup: links after all checks: " << sendBackupCtx.printMembers());
// Re-check after the waiting lock has been reacquired
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
sendBackup_CloseBrokenSockets((sendBackupCtx));
// Re-check after the waiting lock has been reacquired
if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
// If all links out of the unstable-running links are blocked (SRT_EASYNCSND),
// perform epoll wait on them. In this situation we know that
// there are no idle blocked links because IDLE LINK CAN'T BE BLOCKED,
// no matter what. It's because the link may only be blocked if
// the sender buffer of this socket is full, and it can't be
// full if it wasn't used so far.
//
// This means that in case when we have no stable links, we
// need to try out any link that can accept the rexmit-load.
// We'll check link stability at the next sending attempt.
sendBackup_RetryWaitBlocked((sendBackupCtx), (group_send_result), (none_succeeded), (w_mc), (cx));
sendBackup_SilenceRedundantLinks((sendBackupCtx), currtime);
// (closing condition checked inside this call)
if (none_succeeded)
{
HLOGC(gslog.Debug, log << "grp/sendBackup: all links broken (none succeeded to send a payload)");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
// Reparse error code, if set.
// It might be set, if the last operation was failed.
// If any operation succeeded, this will not be executed anyway.
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}
// At least one link has succeeded, update sending stats.
m_stats.sent.count(len);
// Now fill in the socket table. Check if the size is enough, if not,
// then set the pointer to NULL and set the correct size.
// Note that list::size() is linear time, however this shouldn't matter,
// as with the increased number of links in the redundancy group the
// impossibility of using that many of them grows exponentally.
const size_t grpsize = m_Group.size();
if (w_mc.grpdata_size < grpsize)
{
w_mc.grpdata = NULL;
}
size_t i = 0;
bool ready_again = false;
HLOGC(gslog.Debug, log << "grp/sendBackup: copying group data");
for (gli_t d = m_Group.begin(); d != m_Group.end(); ++d, ++i)
{
if (w_mc.grpdata)
{
// Enough space to fill
copyGroupData(*d, (w_mc.grpdata[i]));
}
// We perform this loop anyway because we still need to check if any
// socket is writable. Note that the group lock will hold any write ready
// updates that are performed just after a single socket update for the
// group, so if any socket is actually ready at the moment when this
// is performed, and this one will result in none-write-ready, this will
// be fixed just after returning from this function.
ready_again = ready_again || d->ps->writeReady();
}
w_mc.grpdata_size = i;
if (!ready_again)
{
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
}
HLOGC(gslog.Debug,
log << "grp/sendBackup: successfully sent " << group_send_result << " bytes, "
<< (ready_again ? "READY for next" : "NOT READY to send next"));
return group_send_result;
}
// [[using locked(this->m_GroupLock)]]
int32_t CUDTGroup::addMessageToBuffer(const char* buf, size_t len, SRT_MSGCTRL& w_mc)
{
if (m_iSndAckedMsgNo == SRT_MSGNO_NONE)
{
// Very first packet, just set the msgno.
m_iSndAckedMsgNo = w_mc.msgno;
m_iSndOldestMsgNo = w_mc.msgno;
HLOGC(gslog.Debug, log << "addMessageToBuffer: initial message no #" << w_mc.msgno);
}
else if (m_iSndOldestMsgNo != m_iSndAckedMsgNo)
{
int offset = MsgNo(m_iSndAckedMsgNo) - MsgNo(m_iSndOldestMsgNo);
HLOGC(gslog.Debug,
log << "addMessageToBuffer: new ACK-ed messages: #(" << m_iSndOldestMsgNo << "-" << m_iSndAckedMsgNo
<< ") - going to remove");
if (offset > int(m_SenderBuffer.size()))
{
LOGC(gslog.Error,
log << "addMessageToBuffer: IPE: offset=" << offset << " exceeds buffer size=" << m_SenderBuffer.size()
<< " - CLEARING");
m_SenderBuffer.clear();
}
else
{
HLOGC(gslog.Debug,
log << "addMessageToBuffer: erasing " << offset << "/" << m_SenderBuffer.size()
<< " group-senderbuffer ACKED messages for #" << m_iSndOldestMsgNo << " - #" << m_iSndAckedMsgNo);
m_SenderBuffer.erase(m_SenderBuffer.begin(), m_SenderBuffer.begin() + offset);
}
// Position at offset is not included
m_iSndOldestMsgNo = m_iSndAckedMsgNo;
HLOGC(gslog.Debug,
log << "addMessageToBuffer: ... after: oldest #" << m_iSndOldestMsgNo);
}
m_SenderBuffer.resize(m_SenderBuffer.size() + 1);
BufferedMessage& bm = m_SenderBuffer.back();
bm.mc = w_mc;
bm.copy(buf, len);
HLOGC(gslog.Debug,
log << "addMessageToBuffer: #" << w_mc.msgno << " size=" << len << " !" << BufferStamp(buf, len));
return m_SenderBuffer.front().mc.pktseq;
}
int CUDTGroup::sendBackup_SendOverActive(const char* buf, int len, SRT_MSGCTRL& w_mc, const steady_clock::time_point& currtime, int32_t& w_curseq,
size_t& w_nsuccessful, uint16_t& w_maxActiveWeight, SendBackupCtx& w_sendBackupCtx, CUDTException& w_cx)
{
if (w_mc.srctime == 0)
w_mc.srctime = count_microseconds(currtime.time_since_epoch());
SRT_ASSERT(w_nsuccessful == 0);
SRT_ASSERT(w_maxActiveWeight == 0);
int group_send_result = SRT_ERROR;
// TODO: implement iterator over active links
typedef vector<BackupMemberStateEntry>::const_iterator const_iter_t;
for (const_iter_t member = w_sendBackupCtx.memberStates().begin(); member != w_sendBackupCtx.memberStates().end(); ++member)
{
if (!isStateActive(member->state))
continue;
SocketData* d = member->pSocketData;
int erc = SRT_SUCCESS;
// Remaining sndstate is SRT_GST_RUNNING. Send a payload through it.
CUDT& u = d->ps->core();
const int32_t lastseq = u.schedSeqNo();
int sndresult = SRT_ERROR;
try
{
// This must be wrapped in try-catch because on error it throws an exception.
// Possible return values are only 0, in case when len was passed 0, or a positive
// >0 value that defines the size of the data that it has sent, that is, in case
// of Live mode, equal to 'len'.
sndresult = u.sendmsg2(buf, len, (w_mc));
}
catch (CUDTException& e)
{
w_cx = e;
erc = e.getErrorCode();
sndresult = SRT_ERROR;
}
const bool send_succeeded = sendBackup_CheckSendStatus(
currtime,
sndresult,
lastseq,
w_mc.pktseq,
(u),
(w_curseq),
(group_send_result));
if (send_succeeded)
{
++w_nsuccessful;
w_maxActiveWeight = max(w_maxActiveWeight, d->weight);
if (u.m_pSndBuffer)
w_sendBackupCtx.setRateEstimate(u.m_pSndBuffer->getRateEstimator());
}
else if (erc == SRT_EASYNCSND)
{
sendBackup_AssignBackupState(u, BKUPST_ACTIVE_UNSTABLE, currtime);
w_sendBackupCtx.updateMemberState(d, BKUPST_ACTIVE_UNSTABLE);
}
d->sndresult = sndresult;
d->laststatus = d->ps->getStatus();
}
return group_send_result;
}
// [[using locked(this->m_GroupLock)]]
int CUDTGroup::sendBackupRexmit(CUDT& core, SRT_MSGCTRL& w_mc)
{
// This should resend all packets
if (m_SenderBuffer.empty())
{
LOGC(gslog.Fatal, log << "IPE: sendBackupRexmit: sender buffer empty");
// Although act as if it was successful, otherwise you'll get connection break
return 0;
}
// using [[assert !m_SenderBuffer.empty()]];
// Send everything you currently have in the sender buffer.
// The receiver will reject packets that it currently has.
// Start from the oldest.
CPacket packet;
set<int> results;
int stat = -1;
// Make sure that the link has correctly synchronized sequence numbers.
// Note that sequence numbers should be recorded in mc.
int32_t curseq = m_SenderBuffer[0].mc.pktseq;
size_t skip_initial = 0;
if (curseq != core.schedSeqNo())
{
const int distance = CSeqNo::seqoff(core.schedSeqNo(), curseq);
if (distance < 0)
{
// This may happen in case when the link to be activated is already running.
// Getting sequences backwards is not allowed, as sending them makes no
// sense - they are already ACK-ed or are behind the ISN. Instead, skip all
// packets that are in the past towards the scheduling sequence.
skip_initial = -distance;
LOGC(gslog.Warn,
log << "sendBackupRexmit: OVERRIDE attempt. Link seqno %" << core.schedSeqNo() << ", trying to send from seqno %" << curseq
<< " - DENIED; skip " << skip_initial << " pkts, " << m_SenderBuffer.size() << " pkts in buffer");
}
else
{
// In case when the next planned sequence on this link is behind
// the firstmost sequence in the backup buffer, synchronize the
// sequence with it first so that they go hand-in-hand with
// sequences already used by the link from which packets were
// copied to the backup buffer.
IF_HEAVY_LOGGING(int32_t old = core.schedSeqNo());
const bool su SRT_ATR_UNUSED = core.overrideSndSeqNo(curseq);
HLOGC(gslog.Debug,
log << "sendBackupRexmit: OVERRIDING seq %" << old << " with %" << curseq
<< (su ? " - succeeded" : " - FAILED!"));
}
}
if (skip_initial >= m_SenderBuffer.size())
{
LOGC(gslog.Warn,
log << "sendBackupRexmit: All packets were skipped. Nothing to send %" << core.schedSeqNo() << ", trying to send from seqno %" << curseq
<< " - DENIED; skip " << skip_initial << " packets");
return 0; // can't return any other state, nothing was sent
}
senderBuffer_t::iterator i = m_SenderBuffer.begin() + skip_initial;
// Send everything - including the packet freshly added to the buffer
for (; i != m_SenderBuffer.end(); ++i)
{
// NOTE: an exception from here will interrupt the loop
// and will be caught in the upper level.
stat = core.sendmsg2(i->data, (int)i->size, (i->mc));
if (stat == -1)
{
// Stop sending if one sending ended up with error
LOGC(gslog.Warn,
log << "sendBackupRexmit: sending from buffer stopped at %" << core.schedSeqNo() << " and FAILED");
return -1;
}
}
// Copy the contents of the last item being updated.
w_mc = m_SenderBuffer.back().mc;
HLOGC(gslog.Debug, log << "sendBackupRexmit: pre-sent collected %" << curseq << " - %" << w_mc.pktseq);
return stat;
}
// [[using locked(CUDTGroup::m_GroupLock)]];
void CUDTGroup::ackMessage(int32_t msgno)
{
// The message id could not be identified, skip.
if (msgno == SRT_MSGNO_CONTROL)
{
HLOGC(gslog.Debug, log << "ackMessage: msgno not found in ACK-ed sequence");
return;
}
// It's impossible to get the exact message position as the
// message is allowed also to span for multiple packets.
// Search since the oldest packet until you hit the first
// packet with this message number.
// First, you need to decrease the message number by 1. It's
// because the sequence number being ACK-ed can be in the middle
// of the message, while it doesn't acknowledge that the whole
// message has been received. Decrease the message number so that
// partial-message-acknowledgement does not swipe the whole message,
// part of which may need to be retransmitted over a backup link.
int offset = MsgNo(msgno) - MsgNo(m_iSndAckedMsgNo);
if (offset <= 0)
{
HLOGC(gslog.Debug, log << "ackMessage: already acked up to msgno=" << msgno);
return;
}
HLOGC(gslog.Debug, log << "ackMessage: updated to #" << msgno);
// Update last acked. Will be picked up when adding next message.
m_iSndAckedMsgNo = msgno;
}
void CUDTGroup::processKeepalive(CUDTGroup::SocketData* gli)
{
// received keepalive for that group member
// In backup group it means that the link went IDLE.
if (m_type == SRT_GTYPE_BACKUP)
{
if (gli->rcvstate == SRT_GST_RUNNING)
{
gli->rcvstate = SRT_GST_IDLE;
HLOGC(gslog.Debug, log << "GROUP: received KEEPALIVE in @" << gli->id << " - link turning rcv=IDLE");
}
// When received KEEPALIVE, the sending state should be also
// turned IDLE, if the link isn't temporarily activated. The
// temporarily activated link will not be measured stability anyway,
// while this should clear out the problem when the transmission is
// stopped and restarted after a while. This will simply set the current
// link as IDLE on the sender when the peer sends a keepalive because the
// data stopped coming in and it can't send ACKs therefore.
//
// This also shouldn't be done for the temporary activated links because
// stability timeout could be exceeded for them by a reason that, for example,
// the packets come with the past sequences (as they are being synchronized
// the sequence per being IDLE and empty buffer), so a large portion of initial
// series of packets may come with past sequence, delaying this way with ACK,
// which may result not only with exceeded stability timeout (which fortunately
// isn't being measured in this case), but also with receiveing keepalive
// (therefore we also don't reset the link to IDLE in the temporary activation period).
if (gli->sndstate == SRT_GST_RUNNING && is_zero(gli->ps->core().m_tsFreshActivation))
{
gli->sndstate = SRT_GST_IDLE;
HLOGC(gslog.Debug,
log << "GROUP: received KEEPALIVE in @" << gli->id << " active=PAST - link turning snd=IDLE");
}
}
}
void CUDTGroup::internalKeepalive(SocketData* gli)
{
// This is in response to AGENT SENDING keepalive. This means that there's
// no transmission in either direction, but the KEEPALIVE packet from the
// other party could have been missed. This is to ensure that the IDLE state
// is recognized early enough, before any sequence discrepancy can happen.
if (m_type == SRT_GTYPE_BACKUP && gli->rcvstate == SRT_GST_RUNNING)
{
gli->rcvstate = SRT_GST_IDLE;
// Prevent sending KEEPALIVE again in group-sending
gli->ps->core().m_tsFreshActivation = steady_clock::time_point();
HLOGC(gslog.Debug, log << "GROUP: EXP-requested KEEPALIVE in @" << gli->id << " - link turning IDLE");
}
}
CUDTGroup::BufferedMessageStorage CUDTGroup::BufferedMessage::storage(SRT_LIVE_MAX_PLSIZE /*, 1000*/);
// Forwarder needed due to class definition order
int32_t CUDTGroup::generateISN()
{
return CUDT::generateISN();
}
void CUDTGroup::setGroupConnected()
{
if (!m_bConnected)
{
// Switch to connected state and give appropriate signal
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_CONNECT, true);
m_bConnected = true;
}
}
void CUDTGroup::updateLatestRcv(CUDTSocket* s)
{
// Currently only Backup groups use connected idle links.
if (m_type != SRT_GTYPE_BACKUP)
return;
HLOGC(grlog.Debug,
log << "updateLatestRcv: BACKUP group, updating from active link @" << s->m_SocketID << " with %"
<< s->core().m_iRcvLastAck);
CUDT* source = &s->core();
vector<CUDT*> targets;
UniqueLock lg(m_GroupLock);
// Sanity check for a case when getting a deleted socket
if (!s->m_GroupOf)
return;
// Under a group lock, we block execution of removal of the socket
// from the group, so if m_GroupOf is not NULL, we are granted
// that m_GroupMemberData is valid.
SocketData* current = s->m_GroupMemberData;
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
// Skip the socket that has reported packet reception
if (&*gi == current)
{
HLOGC(grlog.Debug, log << "grp: NOT updating rcv-seq on self @" << gi->id);
continue;
}
// Don't update the state if the link is:
// - PENDING - because it's not in the connected state, wait for it.
// - RUNNING - because in this case it should have its own line of sequences
// - BROKEN - because it doesn't make sense anymore, about to be removed
if (gi->rcvstate != SRT_GST_IDLE)
{
HLOGC(grlog.Debug,
log << "grp: NOT updating rcv-seq on @" << gi->id
<< " - link state:" << srt_log_grp_state[gi->rcvstate]);
continue;
}
// Sanity check
if (!gi->ps->core().m_bConnected)
{
HLOGC(grlog.Debug, log << "grp: IPE: NOT updating rcv-seq on @" << gi->id << " - IDLE BUT NOT CONNECTED");
continue;
}
targets.push_back(&gi->ps->core());
}
lg.unlock();
// Do this on the unlocked group because this
// operation will need receiver lock, so it might
// risk a deadlock.
for (size_t i = 0; i < targets.size(); ++i)
{
targets[i]->updateIdleLinkFrom(source);
}
}
void CUDTGroup::activateUpdateEvent(bool still_have_items)
{
// This function actually reacts on the fact that a socket
// was deleted from the group. This might make the group empty.
if (!still_have_items) // empty, or removal of unknown socket attempted - set error on group
{
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);
}
else
{
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_UPDATE, true);
}
}
void CUDTGroup::addEPoll(int eid)
{
enterCS(m_Global.m_EPoll.m_EPollLock);
m_sPollID.insert(eid);
leaveCS(m_Global.m_EPoll.m_EPollLock);
bool any_read = false;
bool any_write = false;
bool any_broken = false;
bool any_pending = false;
{
// Check all member sockets
ScopedLock gl(m_GroupLock);
// We only need to know if there is any socket that is
// ready to get a payload and ready to receive from.
for (gli_t i = m_Group.begin(); i != m_Group.end(); ++i)
{
if (i->sndstate == SRT_GST_IDLE || i->sndstate == SRT_GST_RUNNING)
{
any_write |= i->ps->writeReady();
}
if (i->rcvstate == SRT_GST_IDLE || i->rcvstate == SRT_GST_RUNNING)
{
any_read |= i->ps->readReady();
}
if (i->ps->broken())
any_broken |= true;
else
any_pending |= true;
}
}
// This is stupid, but we don't have any other interface to epoll
// internals. Actually we don't have to check if id() is in m_sPollID
// because we know it is, as we just added it. But it's not performance
// critical, sockets are not being often added during transmission.
if (any_read)
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, true);
if (any_write)
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, true);
// Set broken if none is non-broken (pending, read-ready or write-ready)
if (any_broken && !any_pending)
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
}
void CUDTGroup::removeEPollEvents(const int eid)
{
// clear IO events notifications;
// since this happens after the epoll ID has been removed, they cannot be set again
set<int> remove;
remove.insert(eid);
m_Global.m_EPoll.update_events(id(), remove, SRT_EPOLL_IN | SRT_EPOLL_OUT, false);
}
void CUDTGroup::removeEPollID(const int eid)
{
enterCS(m_Global.m_EPoll.m_EPollLock);
m_sPollID.erase(eid);
leaveCS(m_Global.m_EPoll.m_EPollLock);
}
void CUDTGroup::updateFailedLink()
{
ScopedLock lg(m_GroupLock);
// Check all members if they are in the pending
// or connected state.
int nhealthy = 0;
for (gli_t i = m_Group.begin(); i != m_Group.end(); ++i)
{
if (i->sndstate < SRT_GST_BROKEN)
nhealthy++;
}
if (!nhealthy)
{
// No healthy links, set ERR on epoll.
HLOGC(gmlog.Debug, log << "group/updateFailedLink: All sockets broken");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);
}
else
{
HLOGC(gmlog.Debug, log << "group/updateFailedLink: Still " << nhealthy << " links in the group");
}
}
#if ENABLE_HEAVY_LOGGING
// [[using maybe_locked(CUDT::uglobal()->m_GlobControlLock)]]
void CUDTGroup::debugGroup()
{
ScopedLock gg(m_GroupLock);
HLOGC(gmlog.Debug, log << "GROUP MEMBER STATUS - $" << id());
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
HLOGC(gmlog.Debug,
log << " ... id { agent=@" << gi->id << " peer=@" << gi->ps->m_PeerID
<< " } address { agent=" << gi->agent.str() << " peer=" << gi->peer.str() << "} "
<< " state {snd=" << StateStr(gi->sndstate) << " rcv=" << StateStr(gi->rcvstate) << "}");
}
}
#endif
} // namespace srt
|