1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657
|
/* queue.c
*
* This file implements the queue object and its several queueing methods.
*
* File begun on 2008-01-03 by RGerhards
*
* There is some in-depth documentation available in doc/dev_queue.html
* (and in the web doc set on https://www.rsyslog.com/doc/). Be sure to read it
* if you are getting aquainted to the object.
*
* NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static
* function names - this makes it really hard to read and does not provide much
* benefit, at least I (now) think so...
*
* Copyright 2008-2025 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* The rsyslog runtime library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The rsyslog runtime library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
/**
* @file queue.c
* @brief This file implements the rsyslog queueing subsystem.
*
* @section queue_maintenance Important Note on Maintenance
*
* This header comment contains critical information on the system's
* architecture and design philosophy. It is essential that this comment, and
* all other relevant documentation, be **updated whenever architectural or
* other significant changes are made to the queueing subsystem.**
*
* Maintaining synchronization between code and documentation is vital for
* long-term project health, developer onboarding, and to enable automated
* tools and AI agents to accurately analyze the codebase and detect potential
* issues arising from undocumented changes. This documentation reflects the
* state of the system as of mid-2025, based on a battle-proven design that
* originated circa 2004.
*
* @section queue_architecture Architectural Overview and Design Philosophy
*
* The rsyslog queueing system is a fundamental component for providing both
* performance and reliability. It is built on a powerful abstraction: a queue
* can be placed at two key points in the message processing pipeline:
*
* 1. **Ruleset Queue (Main Message Queue):** Each ruleset has a single queue
* that buffers messages received from inputs *before* they are processed
* by the ruleset's filters. This decouples message ingestion from filter
* processing, allowing rsyslog to handle massive input bursts without
* losing messages. The queue for the default ruleset is often referred
* to by its historical name, the "main message queue".
*
* 2. **Action Queue:** Each action within a ruleset can have its own dedicated
* queue. This decouples the filter engine from the output action (e.g.,
* writing to a file or sending over the network).
*
* This system's design is a testament to operator-centric control, providing
* a sophisticated toolkit of compromises. This contrasts sharply with modern
* "WAL-only" log shippers, making rsyslog uniquely versatile.
*
*
* @subsection queue_types The Four Queue Types
*
* Rsyslog offers four queue types, each with a specific performance and
* reliability profile. They are listed here from most lightweight to most
* robust.
*
* 1. **Direct (The "No-Queue" Queue)**
* - **Behavior:** The default for all **action queues**. No buffering occurs. The
* worker thread from the parent queue (usually the ruleset's queue)
* executes the action's logic directly.
* - **Use Case:** For fast, non-blocking, local actions (e.g., `omfile`).
* - **Warning:** If a Direct-queued action blocks, it stalls the worker
* thread, potentially halting all processing for that worker.
*
* 2. **In-Memory (LinkedList and FixedArray)**
* - **Behavior:** Buffers messages in RAM. Extremely fast but offers no
* persistence across restarts.
* - **Sub-Types:**
* - `LinkedList`: The recommended default for most in-memory queues. It is
* memory-efficient, allocating space only for messages it holds.
* - `FixedArray`: A legacy option that pre-allocates a static array of
* pointers. It can be slightly faster under constant load but is
* less memory-efficient. It remains the default for ruleset queues.
* - **Use Case:** High-performance buffering where a potential loss of
* in-flight messages on crash is acceptable.
*
* 3. **Disk (The "Pure-Disk" Queue)**
* - **Behavior:** Writes every single message to a disk-based queue structure
* before acknowledging the enqueue operation. This queue provides a
* **"Limited Duplication"** guarantee, not a simple "at-least-once".
* - **The `.qi` Checkpoint File:** The queue's state (read/write pointers)
* is persisted in a `.qi` file. The `queue.checkpointInterval` parameter
* dictates how often this file is updated, allowing the user to tune
* the trade-off between I/O performance and duplication risk. A value
* of `1` provides near-exactly-once delivery, essential for "dumb"
* (non-deduplicating) receivers.
* - **Use Case:** For audit-grade logging chains where no message loss can
* be tolerated, even in the case of a power failure or ungraceful shutdown.
*
* 4. **Disk-Assisted (DA) (The Hybrid "Best-of-Both-Worlds" Queue)**
* - **Behavior:** This is the most sophisticated queue type. It acts as a
* multi-stage defense system against data loss.
* - **Stage 1: In-Memory First:** By default, it operates as a high-speed
* `LinkedList` queue with zero disk I/O.
* - **Stage 2: Disk Spooling:** If the in-memory queue exceeds its
* `highwatermark` (e.g., due to downstream backpressure), it seamlessly
* activates its internal **Disk Queue** and begins spooling messages
* to disk. This provides resilience to transient failures without the
* constant performance penalty of a pure Disk queue. The disk portion
* operates with its own "Limited Duplication" guarantee.
* - **Stage 3: Load Shedding:** If all buffers (memory and disk) are full,
* the queue hits the `queue.discardMark`. It can then begin to discard
* messages based on severity (`queue.discardSeverity`), preserving
* critical logs during a total system overload.
* - **Use Case:** The recommended choice for any potentially unreliable or
* slow action, or for a ruleset queue that needs to survive downstream
* outages.
*
*
* @subsection comparison_to_wal Rsyslog's "Bounded Queue" vs. a WAL's "Unbounded Stream"
*
* It is critical to understand that rsyslog's disk-based queues implement a
* **Bounded FIFO Queue**, which is architecturally different from the
* **Unbounded Stream** model of a Write-Ahead Log (WAL) found in tools like
* Fluent Bit or Vector.
*
* - **Rsyslog's Model:** The `.qi` file checkpoints the queue's *structure*,
* containing two primary tuples: `write_ptr = (segment, offset)` and
* `read_ptr = (segment, offset)`. This defines the queue's boundaries.
* Consumption is a destructive action that advances the `read_ptr`. On a
* graceful restart (e.g., K8s `SIGTERM`), DA queues flush memory to disk,
* ensuring **zero data loss**. On a crash, only the `checkpointInterval`-worth
* of messages are at risk of replay. This fine-grained control makes it safe
* for both smart and dumb receivers. **Note:** A known operational risk is
* that the current implementation does not gracefully handle a missing or
* corrupt `.qi` file in conjunction with pre-existing queue segment files.
* This can lead to startup failures or inconsistent state and is a top
* priority for future reliability enhancements.
*
*
* - **WAL Model:** A WAL is a simple, append-only log. The checkpoint is just
* a consumer's *offset*. On restart, a WAL-based shipper replays *all data*
* from the last offset, which can be massive. This model mandates a smart,
* idempotent receiver and is fundamentally unsafe for dumb endpoints.*
* @subsection naming_convention Historical Naming: queue vs. qqueue
*
* Throughout the code, you will see types and variables prefixed with `qqueue`
* (e.g., `qqueue_t`). This is the result of a historical name change.
* Originally, these were named `queue`, but this caused symbol clashes on some
* platforms (e.g., AIX) where `queue` is a reserved name in system libraries.
* The name was changed to `qqueue` ("queue object for the queueing subsystem")
* to ensure portability.
*
*
* @section conclusion Summary for Developers
*
* When working with this code, remember that you are not dealing with a simple
* log appender. You are maintaining a transactional, persistent FIFO queue.
* The logic surrounding the `.qi` file, segment files, and the read/write
* pointers is designed to provide robust, tunable delivery guarantees that are
* a core feature of rsyslog. This makes it more versatile than pure WAL-based
* log shippers.
*
*/
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h> /* required for HP UX */
#include <time.h>
#include <errno.h>
#include <inttypes.h>
#include "rsyslog.h"
#include "queue.h"
#include "stringbuf.h"
#include "srUtils.h"
#include "obj.h"
#include "wtp.h"
#include "wti.h"
#include "msg.h"
#include "obj.h"
#include "atomic.h"
#include "errmsg.h"
#include "datetime.h"
#include "unicode-helper.h"
#include "statsobj.h"
#include "parserif.h"
#include "rsconf.h"
#ifdef OS_SOLARIS
#include <sched.h>
#endif
/* static data */
DEFobjStaticHelpers;
DEFobjCurrIf(glbl) DEFobjCurrIf(strm) DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj)
#if __GNUC__ >= 8
#pragma GCC diagnostic ignored "-Wcast-function-type" // TODO: investigate further!
#endif /* if __GNUC__ >= 8 */
#ifdef ENABLE_IMDIAG
unsigned int iOverallQueueSize = 0;
#endif
#define OVERSIZE_QUEUE_WATERMARK 500000 /* when is a queue considered to be "overly large"? */
/* forward-definitions */
static rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, smsg_t *pMsg);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal RateLimiter(qqueue_t *pThis);
static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
static rsRetVal qAddDirect(qqueue_t *pThis, smsg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) * pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) * pThis);
static rsRetVal qDestructDisk(qqueue_t *pThis);
rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
/* tables for interfacing with the v6 config system */
static struct cnfparamdescr cnfpdescr[] = {{"queue.filename", eCmdHdlrGetWord, 0},
{"queue.spooldirectory", eCmdHdlrGetWord, 0},
{"queue.size", eCmdHdlrSize, 0},
{"queue.dequeuebatchsize", eCmdHdlrInt, 0},
{"queue.mindequeuebatchsize", eCmdHdlrInt, 0},
{"queue.mindequeuebatchsize.timeout", eCmdHdlrInt, 0},
{"queue.maxdiskspace", eCmdHdlrSize, 0},
{"queue.highwatermark", eCmdHdlrInt, 0},
{"queue.lowwatermark", eCmdHdlrInt, 0},
{"queue.fulldelaymark", eCmdHdlrInt, 0},
{"queue.lightdelaymark", eCmdHdlrInt, 0},
{"queue.discardmark", eCmdHdlrInt, 0},
{"queue.discardseverity", eCmdHdlrFacility, 0},
{"queue.checkpointinterval", eCmdHdlrInt, 0},
{"queue.syncqueuefiles", eCmdHdlrBinary, 0},
{"queue.type", eCmdHdlrQueueType, 0},
{"queue.workerthreads", eCmdHdlrInt, 0},
{"queue.timeoutshutdown", eCmdHdlrInt, 0},
{"queue.timeoutactioncompletion", eCmdHdlrInt, 0},
{"queue.timeoutenqueue", eCmdHdlrInt, 0},
{"queue.timeoutworkerthreadshutdown", eCmdHdlrInt, 0},
{"queue.workerthreadminimummessages", eCmdHdlrInt, 0},
{"queue.maxfilesize", eCmdHdlrSize, 0},
{"queue.saveonshutdown", eCmdHdlrBinary, 0},
{"queue.dequeueslowdown", eCmdHdlrInt, 0},
{"queue.dequeuetimebegin", eCmdHdlrInt, 0},
{"queue.dequeuetimeend", eCmdHdlrInt, 0},
{"queue.cry.provider", eCmdHdlrGetWord, 0},
{"queue.samplinginterval", eCmdHdlrInt, 0},
{"queue.takeflowctlfrommsg", eCmdHdlrBinary, 0}};
static struct cnfparamblk pblk = {CNFPARAMBLK_VERSION, sizeof(cnfpdescr) / sizeof(struct cnfparamdescr), cnfpdescr};
/* support to detect duplicate queue file names */
struct queue_filename {
struct queue_filename *next;
const char *dirname;
const char *filename;
};
struct queue_filename *queue_filename_root = NULL;
/* debug aid */
#if 0
static inline void displayBatchState(batch_t *pBatch)
{
int i;
for(i = 0 ; i < pBatch->nElem ; ++i) {
DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]);
}
}
#endif
static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint);
/* do cleanup when config is loaded */
void qqueueDoneLoadCnf(void) {
struct queue_filename *next, *del;
next = queue_filename_root;
while (next != NULL) {
del = next;
next = next->next;
free((void *)del->filename);
free((void *)del->dirname);
free((void *)del);
}
}
/***********************************************************************
* we need a private data structure, the "to-delete" list. As C does
* not provide any partly private data structures, we implement this
* structure right here inside the module.
* Note that this list must always be kept sorted based on a unique
* dequeue ID (which is monotonically increasing).
* rgerhards, 2009-05-18
***********************************************************************/
/* generate next uniqueue dequeue ID. Note that uniqueness is only required
* on a per-queue basis and while this instance runs. So a stricly monotonically
* increasing counter is sufficient (if enough bits are used).
*/
static inline qDeqID getNextDeqID(qqueue_t *pQueue) {
ISOBJ_TYPE_assert(pQueue, qqueue);
return pQueue->deqIDAdd++;
}
/* return the top element of the to-delete list or NULL, if the
* list is empty.
*/
static toDeleteLst_t *tdlPeek(qqueue_t *pQueue) {
ISOBJ_TYPE_assert(pQueue, qqueue);
return pQueue->toDeleteLst;
}
/* remove the top element of the to-delete list. Nothing but the
* element itself is destroyed. Must not be called when the list
* is empty.
*/
static rsRetVal tdlPop(qqueue_t *pQueue) {
toDeleteLst_t *pRemove;
DEFiRet;
ISOBJ_TYPE_assert(pQueue, qqueue);
assert(pQueue->toDeleteLst != NULL);
pRemove = pQueue->toDeleteLst;
pQueue->toDeleteLst = pQueue->toDeleteLst->pNext;
free(pRemove);
RETiRet;
}
/* Add a new to-delete list entry. The function allocates the data
* structure, populates it with the values provided and links the new
* element into the correct place inside the list.
*/
static rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElemDeq) {
toDeleteLst_t *pNew;
toDeleteLst_t *pPrev;
DEFiRet;
ISOBJ_TYPE_assert(pQueue, qqueue);
assert(pQueue->toDeleteLst != NULL);
CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t)));
pNew->deqID = deqID;
pNew->nElemDeq = nElemDeq;
/* now find right spot */
for (pPrev = pQueue->toDeleteLst; pPrev != NULL && deqID > pPrev->deqID; pPrev = pPrev->pNext) {
/*JUST SEARCH*/;
}
if (pPrev == NULL) {
pNew->pNext = pQueue->toDeleteLst;
pQueue->toDeleteLst = pNew;
} else {
pNew->pNext = pPrev->pNext;
pPrev->pNext = pNew;
}
finalize_it:
RETiRet;
}
/* methods */
static const char *getQueueTypeName(queueType_t t) {
const char *r;
switch (t) {
case QUEUETYPE_FIXED_ARRAY:
r = "FixedArray";
break;
case QUEUETYPE_LINKEDLIST:
r = "LinkedList";
break;
case QUEUETYPE_DISK:
r = "Disk";
break;
case QUEUETYPE_DIRECT:
r = "Direct";
break;
default:
r = "invalid/unknown queue mode";
break;
}
return r;
}
void qqueueDbgPrint(qqueue_t *pThis) {
dbgoprint((obj_t *)pThis, "parameter dump:\n");
dbgoprint((obj_t *)pThis, "queue.filename '%s'\n",
(pThis->pszFilePrefix == NULL) ? "[NONE]" : (char *)pThis->pszFilePrefix);
dbgoprint((obj_t *)pThis, "queue.size: %d\n", pThis->iMaxQueueSize);
dbgoprint((obj_t *)pThis, "queue.dequeuebatchsize: %d\n", pThis->iDeqBatchSize);
dbgoprint((obj_t *)pThis, "queue.mindequeuebatchsize: %d\n", pThis->iMinDeqBatchSize);
dbgoprint((obj_t *)pThis, "queue.mindequeuebatchsize.timeout: %d\n", pThis->toMinDeqBatchSize);
dbgoprint((obj_t *)pThis, "queue.maxdiskspace: %lld\n", pThis->sizeOnDiskMax);
dbgoprint((obj_t *)pThis, "queue.highwatermark: %d\n", pThis->iHighWtrMrk);
dbgoprint((obj_t *)pThis, "queue.lowwatermark: %d\n", pThis->iLowWtrMrk);
dbgoprint((obj_t *)pThis, "queue.fulldelaymark: %d\n", pThis->iFullDlyMrk);
dbgoprint((obj_t *)pThis, "queue.lightdelaymark: %d\n", pThis->iLightDlyMrk);
dbgoprint((obj_t *)pThis, "queue.takeflowctlfrommsg: %d\n", pThis->takeFlowCtlFromMsg);
dbgoprint((obj_t *)pThis, "queue.discardmark: %d\n", pThis->iDiscardMrk);
dbgoprint((obj_t *)pThis, "queue.discardseverity: %d\n", pThis->iDiscardSeverity);
dbgoprint((obj_t *)pThis, "queue.checkpointinterval: %d\n", pThis->iPersistUpdCnt);
dbgoprint((obj_t *)pThis, "queue.syncqueuefiles: %d\n", pThis->bSyncQueueFiles);
dbgoprint((obj_t *)pThis, "queue.type: %d [%s]\n", pThis->qType, getQueueTypeName(pThis->qType));
dbgoprint((obj_t *)pThis, "queue.workerthreads: %d\n", pThis->iNumWorkerThreads);
dbgoprint((obj_t *)pThis, "queue.timeoutshutdown: %d\n", pThis->toQShutdown);
dbgoprint((obj_t *)pThis, "queue.timeoutactioncompletion: %d\n", pThis->toActShutdown);
dbgoprint((obj_t *)pThis, "queue.timeoutenqueue: %d\n", pThis->toEnq);
dbgoprint((obj_t *)pThis, "queue.timeoutworkerthreadshutdown: %d\n", pThis->toWrkShutdown);
dbgoprint((obj_t *)pThis, "queue.workerthreadminimummessages: %d\n", pThis->iMinMsgsPerWrkr);
dbgoprint((obj_t *)pThis, "queue.maxfilesize: %lld\n", pThis->iMaxFileSize);
dbgoprint((obj_t *)pThis, "queue.saveonshutdown: %d\n", pThis->bSaveOnShutdown);
dbgoprint((obj_t *)pThis, "queue.dequeueslowdown: %d\n", pThis->iDeqSlowdown);
dbgoprint((obj_t *)pThis, "queue.dequeuetimebegin: %d\n", pThis->iDeqtWinFromHr);
dbgoprint((obj_t *)pThis, "queue.dequeuetimeend: %d\n", pThis->iDeqtWinToHr);
}
/* get the physical queue size. Must only be called
* while mutex is locked!
* rgerhards, 2008-01-29
*/
static int getPhysicalQueueSize(qqueue_t *pThis) {
return (int)PREFER_FETCH_32BIT(pThis->iQueueSize);
}
/* get the logical queue size (that is store size minus logically dequeued elements).
* Must only be called while mutex is locked!
* rgerhards, 2009-05-19
*/
static int getLogicalQueueSize(qqueue_t *pThis) {
return pThis->iQueueSize - pThis->nLogDeq;
}
/* This function drains the queue in cases where this needs to be done. The most probable
* reason is a HUP which needs to discard data (because the queue is configured to be lossy).
* During a shutdown, this is typically not needed, as the OS frees up ressources and does
* this much quicker than when we clean up ourselvs. -- rgerhards, 2008-10-21
* This function returns void, as it makes no sense to communicate an error back, even if
* it happens.
* This functions works "around" the regular deque mechanism, because it is only used to
* clean up (in cases where message loss is acceptable).
*/
static void queueDrain(qqueue_t *pThis) {
smsg_t *pMsg;
assert(pThis != NULL);
DBGOPRINT((obj_t *)pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType,
pThis->iQueueSize);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while (ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
pThis->qDeq(pThis, &pMsg);
if (pMsg != NULL) {
msgDestruct(&pMsg);
}
pThis->qDel(pThis);
}
}
/* --------------- code for disk-assisted (DA) queue modes -------------------- */
/* returns the number of workers that should be advised at
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
static rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) {
DEFiRet;
int iMaxWorkers;
ISOBJ_TYPE_assert(pThis, qqueue);
if (!pThis->bEnqOnly) {
if (pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
DBGOPRINT((obj_t *)pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1, DENY_WORKER_START_DURING_SHUTDOWN);
/* disk queues have always one worker */
}
if (getLogicalQueueSize(pThis) == 0) {
iMaxWorkers = 0;
} else if (pThis->iMinMsgsPerWrkr == 0) {
iMaxWorkers = 1;
} else {
iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers, DENY_WORKER_START_DURING_SHUTDOWN);
}
RETiRet;
}
/* check if we run in disk-assisted mode and record that
* setting for easy (and quick!) access in the future. This
* function must only be called from constructors and only
* from those that support disk-assisted modes (aka memory-
* based queue drivers).
* rgerhards, 2008-01-14
*/
static rsRetVal qqueueChkIsDA(qqueue_t *pThis) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
if (pThis->pszFilePrefix != NULL) {
pThis->bIsDA = 1;
DBGOPRINT((obj_t *)pThis, "is disk-assisted, disk will be used on demand\n");
} else {
DBGOPRINT((obj_t *)pThis, "is NOT disk-assisted\n");
}
RETiRet;
}
/* Start disk-assisted queue mode.
* rgerhards, 2008-01-15
*/
static rsRetVal StartDA(qqueue_t *pThis) {
DEFiRet;
uchar pszDAQName[128];
ISOBJ_TYPE_assert(pThis, qqueue);
/* create message queue */
CHKiRet(qqueueConstruct(&pThis->pqDA, QUEUETYPE_DISK, pThis->iNumWorkerThreads, 0, pThis->pConsumer));
/* give it a name */
snprintf((char *)pszDAQName, sizeof(pszDAQName), "%s[DA]", obj.GetName((obj_t *)pThis));
obj.SetName((obj_t *)pThis->pqDA, pszDAQName);
/* as the created queue is the same object class, we take the
* liberty to access its properties directly.
*/
pThis->pqDA->pqParent = pThis;
CHKiRet(qqueueSetpAction(pThis->pqDA, pThis->pAction));
CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax));
CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(qqueueSetSpoolDir(pThis->pqDA, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown));
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
pThis->pqDA->iDeqBatchSize = pThis->iDeqBatchSize;
pThis->pqDA->iMinDeqBatchSize = pThis->iMinDeqBatchSize;
pThis->pqDA->iMinMsgsPerWrkr = pThis->iMinMsgsPerWrkr;
pThis->pqDA->iLowWtrMrk = pThis->iLowWtrMrk;
if (pThis->useCryprov) {
/* hand over cryprov to DA queue - in-mem queue does no longer need it
* and DA queue will be kept active from now on until termination.
*/
pThis->pqDA->useCryprov = pThis->useCryprov;
pThis->pqDA->cryprov = pThis->cryprov;
pThis->pqDA->cryprovData = pThis->cryprovData;
pThis->pqDA->cryprovName = pThis->cryprovName;
pThis->pqDA->cryprovNameFull = pThis->cryprovNameFull;
/* reset memory queue parameters */
pThis->useCryprov = 0;
/* pThis->cryprov cannot and need not be reset, is structure */
pThis->cryprovData = NULL;
pThis->cryprovName = NULL;
pThis->cryprovNameFull = NULL;
}
iRet = qqueueStart(runConf, pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
if (iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) {
errno = 0; /* else an errno is shown in errmsg! */
LogError(errno, iRet, "error starting up disk queue, using pure in-memory mode");
pThis->bIsDA = 0; /* disable memory mode */
FINALIZE; /* something is wrong */
}
DBGOPRINT((obj_t *)pThis, "DA queue initialized, disk queue 0x%lx\n", qqueueGetID(pThis->pqDA));
finalize_it:
if (iRet != RS_RET_OK) {
if (pThis->pqDA != NULL) {
qqueueDestruct(&pThis->pqDA);
}
LogError(0, iRet, "%s: error creating disk queue - giving up.", obj.GetName((obj_t *)pThis));
pThis->bIsDA = 0;
}
RETiRet;
}
/* initiate DA mode
* param bEnqOnly tells if the disk queue is to be run in enqueue-only mode. This may
* be needed during shutdown of memory queues which need to be persisted to disk.
* If this function fails (should not happen), DA mode is not turned on.
* rgerhards, 2008-01-16
*/
static rsRetVal ATTR_NONNULL() InitDA(qqueue_t *const pThis, const int bLockMutex) {
DEFiRet;
uchar pszBuf[64];
size_t lenBuf;
ISOBJ_TYPE_assert(pThis, qqueue);
if (bLockMutex == LOCK_MUTEX) {
d_pthread_mutex_lock(pThis->mut);
}
/* check if we already have a DA worker pool. If not, initiate one. Please note that the
* pool is created on first need but never again destructed (until the queue is). This
* is intentional. We assume that when we need it once, we may also need it on another
* occasion. Ressources used are quite minimal when no worker is running.
* rgerhards, 2008-01-24
* NOTE: this is the DA worker *pool*, not the DA queue!
*/
lenBuf = snprintf((char *)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t *)pThis));
CHKiRet(wtpConstruct(&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr(pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr(pThis->pWtpDA, (rsRetVal(*)(void *pUsr, int))qqueueChkStopWrkrDA));
CHKiRet(wtpSetpfGetDeqBatchSize(pThis->pWtpDA, (rsRetVal(*)(void *pUsr, int *))GetDeqBatchSize));
CHKiRet(wtpSetpfDoWork(pThis->pWtpDA, (rsRetVal(*)(void *pUsr, void *pWti))ConsumerDA));
CHKiRet(wtpSetpfObjProcessed(pThis->pWtpDA, (rsRetVal(*)(void *pUsr, wti_t *pWti))batchProcessed));
CHKiRet(wtpSetpmutUsr(pThis->pWtpDA, pThis->mut));
CHKiRet(wtpSetiNumWorkerThreads(pThis->pWtpDA, 1));
CHKiRet(wtpSettoWrkShutdown(pThis->pWtpDA, pThis->toWrkShutdown));
CHKiRet(wtpSetpUsr(pThis->pWtpDA, pThis));
CHKiRet(wtpConstructFinalize(pThis->pWtpDA));
/* if we reach this point, we have a "good" DA worker pool */
/* now construct the actual queue (if it does not already exist) */
if (pThis->pqDA == NULL) {
CHKiRet(StartDA(pThis));
}
finalize_it:
if (bLockMutex == LOCK_MUTEX) {
d_pthread_mutex_unlock(pThis->mut);
}
RETiRet;
}
/* --------------- end code for disk-assisted queue modes -------------------- */
/* Now, we define type-specific handlers. The provide a generic functionality,
* but for this specific type of queue. The mapping to these handlers happens during
* queue construction. Later on, handlers are called by pointers present in the
* queue instance object.
*/
/* -------------------- fixed array -------------------- */
static rsRetVal qConstructFixedArray(qqueue_t *pThis) {
DEFiRet;
assert(pThis != NULL);
if (pThis->iMaxQueueSize == 0) ABORT_FINALIZE(RS_RET_QSIZE_ZERO);
if ((pThis->tVars.farray.pBuf = malloc(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
pThis->tVars.farray.deqhead = 0;
pThis->tVars.farray.head = 0;
pThis->tVars.farray.tail = 0;
qqueueChkIsDA(pThis);
finalize_it:
RETiRet;
}
static rsRetVal qDestructFixedArray(qqueue_t *pThis) {
DEFiRet;
assert(pThis != NULL);
queueDrain(pThis); /* discard any remaining queue entries */
free(pThis->tVars.farray.pBuf);
RETiRet;
}
static rsRetVal qAddFixedArray(qqueue_t *pThis, smsg_t *in) {
DEFiRet;
assert(pThis != NULL);
pThis->tVars.farray.pBuf[pThis->tVars.farray.tail] = in;
pThis->tVars.farray.tail++;
if (pThis->tVars.farray.tail == pThis->iMaxQueueSize) pThis->tVars.farray.tail = 0;
RETiRet;
}
static rsRetVal qDeqFixedArray(qqueue_t *pThis, smsg_t **out) {
DEFiRet;
assert(pThis != NULL);
*out = (void *)pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead];
pThis->tVars.farray.deqhead++;
if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize) pThis->tVars.farray.deqhead = 0;
RETiRet;
}
static rsRetVal qDelFixedArray(qqueue_t *pThis) {
DEFiRet;
assert(pThis != NULL);
pThis->tVars.farray.head++;
if (pThis->tVars.farray.head == pThis->iMaxQueueSize) pThis->tVars.farray.head = 0;
RETiRet;
}
/* -------------------- linked list -------------------- */
static rsRetVal qConstructLinkedList(qqueue_t *pThis) {
DEFiRet;
assert(pThis != NULL);
pThis->tVars.linklist.pDeqRoot = NULL;
pThis->tVars.linklist.pDelRoot = NULL;
pThis->tVars.linklist.pLast = NULL;
qqueueChkIsDA(pThis);
RETiRet;
}
static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) * pThis) {
DEFiRet;
queueDrain(pThis); /* discard any remaining queue entries */
/* with the linked list type, there is nothing left to do here. The
* reason is that there are no dynamic elements for the list itself.
*/
RETiRet;
}
static rsRetVal qAddLinkedList(qqueue_t *pThis, smsg_t *pMsg) {
qLinkedList_t *pEntry;
DEFiRet;
CHKmalloc((pEntry = (qLinkedList_t *)malloc(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
pEntry->pMsg = pMsg;
if (pThis->tVars.linklist.pDelRoot == NULL) {
pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
} else {
pThis->tVars.linklist.pLast->pNext = pEntry;
pThis->tVars.linklist.pLast = pEntry;
}
if (pThis->tVars.linklist.pDeqRoot == NULL) {
pThis->tVars.linklist.pDeqRoot = pEntry;
}
finalize_it:
RETiRet;
}
static rsRetVal qDeqLinkedList(qqueue_t *pThis, smsg_t **ppMsg) {
qLinkedList_t *pEntry;
DEFiRet;
pEntry = pThis->tVars.linklist.pDeqRoot;
if (pEntry != NULL) {
*ppMsg = pEntry->pMsg;
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
} else {
/* Check and return NULL for linklist.pDeqRoot */
dbgprintf("qDeqLinkedList: pDeqRoot is NULL!\n");
*ppMsg = NULL;
pThis->tVars.linklist.pDeqRoot = NULL;
}
RETiRet;
}
static rsRetVal qDelLinkedList(qqueue_t *pThis) {
qLinkedList_t *pEntry;
DEFiRet;
pEntry = pThis->tVars.linklist.pDelRoot;
if (pThis->tVars.linklist.pDelRoot == pThis->tVars.linklist.pLast) {
pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = NULL;
} else {
pThis->tVars.linklist.pDelRoot = pEntry->pNext;
}
free(pEntry);
RETiRet;
}
/* -------------------- disk -------------------- */
/* The following function is used to "save" ourself from being killed by
* a fatally failed disk queue. A fatal failure is, for example, if no
* data can be read or written. In that case, the disk support is disabled,
* with all on-disk structures kept as-is as much as possible. However,
* we do not really stop or destruct the in-memory disk queue object.
* Practice has shown that this may cause races during destruction which
* themselfs can lead to segfault. So we prefer to was some ressources by
* keeping the queue active.
* Instead, the queue is switched to direct mode, so that at least
* some processing can happen. Of course, this may still have lots of
* undesired side-effects, but is probably better than aborting the
* syslogd. Note that this function *must* succeed in one way or another, as
* we can not recover from failure here. But it may emit different return
* states, which can trigger different processing in the higher layers.
* rgerhards, 2011-05-03
*/
static rsRetVal queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) {
pThis->iQueueSize = 0;
pThis->nLogDeq = 0;
pThis->qType = QUEUETYPE_DIRECT;
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
/* these entry points shall not be used in direct mode
* To catch program errors, make us abort if that happens!
* rgerhards, 2013-11-05
*/
pThis->qAdd = qAddDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
pThis->qDel = NULL;
if (pThis->pqParent != NULL) {
DBGOPRINT((obj_t *)pThis, "DA queue is in emergency mode, disabling DA in parent\n");
pThis->pqParent->bIsDA = 0;
pThis->pqParent->pqDA = NULL;
/* This may have undesired side effects, not sure if I really evaluated
* all. So you know where to look at if you come to this point during
* troubleshooting ;) -- rgerhards, 2011-05-03
*/
}
LogError(0, initiatingError,
"fatal error on disk queue '%s', "
"emergency switch to direct mode",
obj.GetName((obj_t *)pThis));
return RS_RET_ERR_QUEUE_EMERGENCY;
}
static rsRetVal qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) * pThis) {
DEFiRet;
ISOBJ_TYPE_assert(pStrm, strm);
ISOBJ_TYPE_assert(pThis, qqueue);
CHKiRet(strm.SetDir(pStrm, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetbSync(pStrm, pThis->bSyncQueueFiles));
finalize_it:
RETiRet;
}
/* The method loads the persistent queue information.
* rgerhards, 2008-01-11
*/
static rsRetVal qqueueTryLoadPersistedInfo(qqueue_t *pThis) {
DEFiRet;
strm_t *psQIF = NULL;
struct stat stat_buf;
ISOBJ_TYPE_assert(pThis, qqueue);
/* check if the file exists */
if (stat((char *)pThis->pszQIFNam, &stat_buf) == -1) {
if (errno == ENOENT) {
DBGOPRINT((obj_t *)pThis, "clean startup, no .qi file found\n");
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
} else {
LogError(errno, RS_RET_IO_ERROR, "queue: %s: error %d could not access .qi file",
obj.GetName((obj_t *)pThis), errno);
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}
/* If we reach this point, we have a .qi file */
CHKiRet(strm.Construct(&psQIF));
CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ));
CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam));
CHKiRet(strm.ConstructFinalize(psQIF));
/* first, we try to read the property bag for ourselfs */
CHKiRet(obj.DeserializePropBag((obj_t *)pThis, psQIF));
/* then the stream objects (same order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar *)"strm", psQIF,
(rsRetVal(*)(obj_t *, void *))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar *)"strm", psQIF,
(rsRetVal(*)(obj_t *, void *))qqueueLoadPersStrmInfoFixup, pThis));
/* create a duplicate for the read "pointer". */
CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
/* if we use a crypto provider, we need to amend the objects with it's info */
if (pThis->useCryprov) {
CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov));
CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData));
CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov));
CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData));
CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov));
CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData));
}
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite));
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDel));
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDeq));
/* OK, we could successfully read the file, so we now can request that it be
* deleted when we are done with the persisted information.
*/
pThis->bNeedDelQIF = 1;
LogMsg(0, RS_RET_OK, LOG_INFO,
"%s: queue files exist on disk, re-starting with "
"%d messages. This will keep the disk queue file open, details: "
"https://rainer.gerhards.net/2013/07/rsyslog-why-disk-assisted-queues-keep-a-file-open.html",
objGetName((obj_t *)pThis), getLogicalQueueSize(pThis));
finalize_it:
if (psQIF != NULL) strm.Destruct(&psQIF);
if (iRet != RS_RET_OK) {
DBGOPRINT((obj_t *)pThis, "state %d reading .qi file - can not read persisted info (if any)\n", iRet);
}
RETiRet;
}
/* disk queue constructor.
* Note that we use a file limit of 10,000,000 files. That number should never pose a
* problem. If so, I guess the user has a design issue... But of course, the code can
* always be changed (though it would probably be more appropriate to increase the
* allowed file size at this point - that should be a config setting...
* rgerhards, 2008-01-10
*/
static rsRetVal qConstructDisk(qqueue_t *pThis) {
DEFiRet;
int bRestarted = 0;
assert(pThis != NULL);
/* and now check if there is some persistent information that needs to be read in */
iRet = qqueueTryLoadPersistedInfo(pThis);
if (iRet == RS_RET_OK)
bRestarted = 1;
else if (iRet != RS_RET_FILE_NOT_FOUND)
FINALIZE;
if (bRestarted == 1) {
;
} else {
CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite));
CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles));
CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
if (pThis->useCryprov) {
CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov));
CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData));
}
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite));
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
if (pThis->useCryprov) {
CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov));
CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData));
}
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel));
CHKiRet(strm.SetbSync(pThis->tVars.disk.pReadDel, pThis->bSyncQueueFiles));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR));
if (pThis->useCryprov) {
CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov));
CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData));
}
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDel));
CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(strm.SetFName(pThis->tVars.disk.pReadDeq, pThis->pszFilePrefix, pThis->lenFilePrefix));
CHKiRet(strm.SetFName(pThis->tVars.disk.pReadDel, pThis->pszFilePrefix, pThis->lenFilePrefix));
}
/* now we set (and overwrite in case of a persisted restart) some parameters which
* should always reflect the current configuration variables. Be careful by doing so,
* for example file name generation must not be changed as that would break the
* ability to read existing queue files. -- rgerhards, 2008-01-12
*/
CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pReadDeq, pThis->iMaxFileSize));
CHKiRet(strm.SetiMaxFileSize(pThis->tVars.disk.pReadDel, pThis->iMaxFileSize));
finalize_it:
RETiRet;
}
static rsRetVal qDestructDisk(qqueue_t *pThis) {
DEFiRet;
assert(pThis != NULL);
free(pThis->pszQIFNam);
if (pThis->tVars.disk.pWrite != NULL) {
int64 currOffs;
strm.GetCurrOffset(pThis->tVars.disk.pWrite, &currOffs);
if (currOffs == 0) {
/* if no data is present, we can (and must!) delete this
* file. Else we can leave garbagge after termination.
*/
strm.SetbDeleteOnClose(pThis->tVars.disk.pWrite, 1);
}
strm.Destruct(&pThis->tVars.disk.pWrite);
}
if (pThis->tVars.disk.pReadDeq != NULL) strm.Destruct(&pThis->tVars.disk.pReadDeq);
if (pThis->tVars.disk.pReadDel != NULL) strm.Destruct(&pThis->tVars.disk.pReadDel);
RETiRet;
}
static rsRetVal ATTR_NONNULL(1, 2) qAddDisk(qqueue_t *const pThis, smsg_t *pMsg) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pMsg, msg);
number_t nWriteCount;
const int oldfile = strmGetCurrFileNum(pThis->tVars.disk.pWrite);
CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount));
CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite));
CHKiRet(strm.Flush(pThis->tVars.disk.pWrite));
CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */
pThis->tVars.disk.sizeOnDisk += nWriteCount;
/* we have enqueued the user element to disk. So we now need to destruct
* the in-memory representation. The instance will be re-created upon
* dequeue. -- rgerhards, 2008-07-09
*/
msgDestruct(&pMsg);
DBGOPRINT((obj_t *)pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n",
nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly);
/* Did we have a change in the on-disk file? If so, we
* should do a "robustness sync" of the .qi file to guard
* against the most harsh consequences of kill -9 and power off.
*/
int newfile;
newfile = strmGetCurrFileNum(pThis->tVars.disk.pWrite);
if (newfile != oldfile) {
DBGOPRINT((obj_t *)pThis,
"current to-be-written-to file has changed from "
"number %d to number %d - requiring a .qi write for robustness\n",
oldfile, newfile);
pThis->tVars.disk.nForcePersist = 2;
}
finalize_it:
RETiRet;
}
static rsRetVal msgConstructFromVoid(void **ppThis) {
return msgConstructForDeserializer((smsg_t **)ppThis);
}
static rsRetVal msgDeserializeFromVoid(void *pObj, strm_t *pStrm) {
return MsgDeserialize((smsg_t *)pObj, pStrm);
}
static rsRetVal qDeqDisk(qqueue_t *pThis, smsg_t **ppMsg) {
DEFiRet;
iRet = objDeserializeWithMethods(ppMsg, (uchar *)"msg", sizeof("msg") - 1, pThis->tVars.disk.pReadDeq, NULL, NULL,
msgConstructFromVoid, NULL, msgDeserializeFromVoid);
if (iRet != RS_RET_OK) {
LogError(0, iRet, "%s: qDeqDisk error happened at around offset %lld", obj.GetName((obj_t *)pThis),
(long long)pThis->tVars.disk.pReadDeq->iCurrOffs);
}
RETiRet;
}
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) * pThis) {
return RS_RET_OK;
}
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) * pThis) {
return RS_RET_OK;
}
static rsRetVal qAddDirectWithWti(qqueue_t *pThis, smsg_t *pMsg, wti_t *pWti) {
batch_t singleBatch;
batch_obj_t batchObj;
batch_state_t batchState = BATCH_STATE_RDY;
DEFiRet;
// TODO: init batchObj (states _OK and new fields -- CHECK)
assert(pThis != NULL);
/* calling the consumer is quite different here than it is from a worker thread */
/* we need to provide the consumer's return value back to the caller because in direct
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
memset(&batchObj, 0, sizeof(batch_obj_t));
memset(&singleBatch, 0, sizeof(batch_t));
batchObj.pMsg = pMsg;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti);
msgDestruct(&pMsg);
RETiRet;
}
/* this is called if we do not have a pWti. This currently only happens
* when we are called from a main queue in direct mode. If so, we need
* to obtain a dummy pWti.
*/
static rsRetVal qAddDirect(qqueue_t *pThis, smsg_t *pMsg) {
wti_t *pWti;
DEFiRet;
pWti = wtiGetDummy();
pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
iRet = qAddDirectWithWti(pThis, pMsg, pWti);
RETiRet;
}
/* --------------- end type-specific handlers -------------------- */
/* generic code to add a queue entry
* We use some specific code to most efficiently support direct mode
* queues. This is justified in spite of the gain and the need to do some
* things truely different. -- rgerhards, 2008-02-12
*/
static rsRetVal qqueueAdd(qqueue_t *pThis, smsg_t *pMsg) {
DEFiRet;
assert(pThis != NULL);
static int msgCnt = 0;
if (pThis->iSmpInterval > 0) {
msgCnt = (msgCnt + 1) % (pThis->iSmpInterval);
if (msgCnt != 0) {
msgDestruct(&pMsg);
goto finalize_it;
}
}
CHKiRet(pThis->qAdd(pThis, pMsg));
if (pThis->qType != QUEUETYPE_DIRECT) {
ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
#ifdef ENABLE_IMDIAG
#ifdef HAVE_ATOMIC_BUILTINS
/* mutex is never used due to conditional compilation */
ATOMIC_INC(&iOverallQueueSize, &NULL);
#else
++iOverallQueueSize; /* racy, but we can't wait for a mutex! */
#endif
#endif
}
finalize_it:
RETiRet;
}
/* generic code to dequeue a queue entry
*/
static rsRetVal qqueueDeq(qqueue_t *pThis, smsg_t **ppMsg) {
DEFiRet;
assert(pThis != NULL);
/* we do NOT abort if we encounter an error, because otherwise the queue
* will not be decremented, what will most probably result in an endless loop.
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
iRet = pThis->qDeq(pThis, ppMsg);
ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq);
DBGOPRINT((obj_t *)pThis, "entry deleted, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis),
getPhysicalQueueSize(pThis));
RETiRet;
}
/* Try to shut down regular and DA queue workers, within the queue timeout
* period. That means processing continues as usual. This is the expected
* usual case, where during shutdown those messages remaining are being
* processed. At this point, it is acceptable that the queue can not be
* fully depleted, that case is handled in the next step. During this phase,
* we first shut down the main queue DA worker to prevent new data to arrive
* at the DA queue, and then we ask the regular workers of both the Regular
* and DA queue to try complete processing.
* rgerhards, 2009-10-14
*/
static rsRetVal ATTR_NONNULL(1) tryShutdownWorkersWithinQueueTimeout(qqueue_t *const pThis) {
struct timespec tTimeout;
rsRetVal iRetLocal;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pThis->pqParent == NULL); /* detect invalid calling sequence */
if (pThis->bIsDA) {
/* We need to lock the mutex, as otherwise we may have a race that prevents
* us from awaking the DA worker. */
d_pthread_mutex_lock(pThis->mut);
/* tell regular queue DA worker to stop shuffling messages to DA queue... */
DBGOPRINT((obj_t *)pThis, "setting EnqOnly mode for DA worker\n");
pThis->pqDA->bEnqOnly = 1;
wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE);
wtpAdviseMaxWorkers(pThis->pWtpDA, 1, DENY_WORKER_START_DURING_SHUTDOWN);
DBGOPRINT((obj_t *)pThis, "awoke DA worker, told it to shut down.\n");
/* also tell the DA queue worker to shut down, so that it already knows... */
wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN);
wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1, DENY_WORKER_START_DURING_SHUTDOWN);
/* awake its lone worker */
DBGOPRINT((obj_t *)pThis, "awoke DA queue regular worker, told it to shut down when done.\n");
d_pthread_mutex_unlock(pThis->mut);
}
/* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
* shutdown of both the regular and DA queue on *the same* timeout.
*/
timeoutComp(&tTimeout, pThis->toQShutdown);
DBGOPRINT((obj_t *)pThis, "trying shutdown of regular workers\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if (iRetLocal == RS_RET_TIMED_OUT) {
LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO,
"%s: regular queue shutdown timed out on primary queue "
"(this is OK, timeout was %d)",
objGetName((obj_t *)pThis), pThis->toQShutdown);
} else {
DBGOPRINT((obj_t *)pThis, "regular queue workers shut down.\n");
}
/* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
if (pThis->pqDA != NULL) {
DBGOPRINT((obj_t *)pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", qqueueGetID(pThis->pqDA));
/* we use the same absolute timeout as above, so we do not use more than the configured
* timeout interval!
*/
DBGOPRINT((obj_t *)pThis, "trying shutdown of regular worker of DA queue\n");
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout);
if (iRetLocal == RS_RET_TIMED_OUT) {
LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO,
"%s: regular queue shutdown timed out on DA queue (this is OK, "
"timeout was %d)",
objGetName((obj_t *)pThis), pThis->toQShutdown);
} else {
DBGOPRINT((obj_t *)pThis, "DA queue worker shut down.\n");
}
}
RETiRet;
}
/* Try to shut down regular and DA queue workers, within the action timeout
* period. This aborts processing, but at the end of the current action, in
* a well-defined manner. During this phase, we terminate all three worker
* pools, including the regular queue DA worker if it not yet has terminated.
* Not finishing processing all messages is OK (and expected) at this stage
* (they may be preserved later, depending * on bSaveOnShutdown setting).
* rgerhards, 2009-10-14
*/
static rsRetVal tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) {
struct timespec tTimeout;
rsRetVal iRetLocal;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
DBGOPRINT((obj_t *)pThis, "trying to shutdown workers within Action Timeout");
DBGOPRINT((obj_t *)pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
/* now DA queue */
if (pThis->bIsDA) {
pThis->pqDA->bEnqOnly = 1;
pThis->pqDA->bShutdownImmediate = 1;
}
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
timeoutComp(&tTimeout, pThis->toActShutdown);
DBGOPRINT((obj_t *)pThis, "trying immediate shutdown of regular workers (if any)\n");
iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if (iRetLocal == RS_RET_TIMED_OUT) {
LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO,
"%s: immediate shutdown timed out on primary queue (this is acceptable and "
"triggers cancellation)",
objGetName((obj_t *)pThis));
} else if (iRetLocal != RS_RET_OK) {
LogMsg(0, iRetLocal, LOG_WARNING,
"%s: potential internal error: unexpected return state after trying "
"immediate shutdown of the primary queue in disk save mode. "
"Continuing, but results are unpredictable",
objGetName((obj_t *)pThis));
}
if (pThis->pqDA != NULL) {
/* and now the same for the DA queue */
DBGOPRINT((obj_t *)pThis, "trying immediate shutdown of DA queue workers\n");
iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if (iRetLocal == RS_RET_TIMED_OUT) {
LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO,
"%s: immediate shutdown timed out on DA queue (this is acceptable and "
"triggers cancellation)",
objGetName((obj_t *)pThis));
} else if (iRetLocal != RS_RET_OK) {
LogMsg(0, iRetLocal, LOG_WARNING,
"%s: potential internal error: unexpected return state after trying "
"immediate shutdown of the DA queue in disk save mode. "
"Continuing, but results are unpredictable",
objGetName((obj_t *)pThis));
}
/* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout,
* which should be sufficient and usually not be required (it is expected to have finished
* long before while we were processing the queue timeout in shutdown phase 1).
* rgerhards, 2009-10-14
*/
timeoutComp(&tTimeout, 100);
DBGOPRINT((obj_t *)pThis, "trying regular shutdown of main queue DA worker pool\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if (iRetLocal == RS_RET_TIMED_OUT) {
LogMsg(0, iRetLocal, LOG_WARNING,
"%s: shutdown timed out on main queue DA worker pool "
"(this is not good, but possibly OK)",
objGetName((obj_t *)pThis));
} else {
DBGOPRINT((obj_t *)pThis, "main queue DA worker pool shut down.\n");
}
}
RETiRet;
}
/* This function cancels all remaining regular workers for both the main and the DA
* queue.
* rgerhards, 2009-05-29
*/
static rsRetVal cancelWorkers(qqueue_t *pThis) {
rsRetVal iRetLocal;
DEFiRet;
assert(pThis->qType != QUEUETYPE_DIRECT);
/* Now queue workers should have terminated. If not, we need to cancel them as we have applied
* all timeout setting. If any worker in any queue still executes, its consumer is possibly
* long-running and cancelling is the only way to get rid of it.
*/
DBGOPRINT((obj_t *)pThis, "checking to see if we need to cancel any worker threads of the primary queue\n");
iRetLocal = wtpCancelAll(pThis->pWtpReg, objGetName((obj_t *)pThis));
/* ^-- returns immediately if all threads already have terminated */
if (iRetLocal != RS_RET_OK) {
DBGOPRINT((obj_t *)pThis,
"unexpected iRet state %d trying to cancel primary queue worker "
"threads, continuing, but results are unpredictable\n",
iRetLocal);
}
/* ... and now the DA queue, if it exists (should always be after the primary one) */
if (pThis->pqDA != NULL) {
DBGOPRINT((obj_t *)pThis,
"checking to see if we need to cancel any worker threads of "
"the DA queue\n");
iRetLocal = wtpCancelAll(pThis->pqDA->pWtpReg, objGetName((obj_t *)pThis));
/* returns immediately if all threads already have terminated */
if (iRetLocal != RS_RET_OK) {
DBGOPRINT((obj_t *)pThis,
"unexpected iRet state %d trying to cancel DA queue worker "
"threads, continuing, but results are unpredictable\n",
iRetLocal);
}
/* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
* restarted later to persist the queue. But we stop it, because otherwise we get into
* big trouble when resetting the logical dequeue pointer. This operation can only be
* done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
*/
DBGOPRINT((obj_t *)pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
wtpCancelAll(pThis->pWtpDA, objGetName((obj_t *)pThis));
/* returns immediately if all threads already have terminated */
}
RETiRet;
}
/* This function shuts down all worker threads and waits until they
* have terminated. If they timeout, they are cancelled.
* rgerhards, 2008-01-24
* Please note that this function shuts down BOTH the parent AND the child queue
* in DA case. This is necessary because their timeouts are tightly coupled. Most
* importantly, the timeouts would be applied twice (or logic be extremely
* complex) if each would have its own shutdown. The function does not self check
* this condition - the caller must make sure it is not called with a parent.
* rgerhards, 2009-05-26: we do NO longer persist the queue here if bSaveOnShutdown
* is set. This must be handled by the caller. Not doing that cleans up the queue
* shutdown considerably. Also, older engines had a potential hang condition when
* the DA queue was already started and the DA worker configured for infinite
* retries and the action was during retry processing. This was a design issue,
* which is solved as of now. Note that the shutdown now may take a little bit
* longer, because we no longer can persist the queue in parallel to waiting
* on worker timeouts.
*/
rsRetVal ATTR_NONNULL(1) qqueueShutdownWorkers(qqueue_t *const pThis) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
if (pThis->qType == QUEUETYPE_DIRECT) {
FINALIZE;
}
assert(pThis->pqParent == NULL); /* detect invalid calling sequence */
DBGOPRINT((obj_t *)pThis, "initiating worker thread shutdown sequence %p\n", pThis);
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
pthread_mutex_lock(pThis->mut);
int physQueueSize;
physQueueSize = getPhysicalQueueSize(pThis);
pthread_mutex_unlock(pThis->mut);
if (physQueueSize > 0) {
CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
}
CHKiRet(cancelWorkers(pThis));
/* ... finally ... all worker threads have terminated :-)
* Well, more precisely, they *are in termination*. Some cancel cleanup handlers
* may still be running. Note that the main queue's DA worker may still be running.
*/
DBGOPRINT((obj_t *)pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n",
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
finalize_it:
RETiRet;
}
/* Constructor for the queue object
* This constructs the data structure, but does not yet start the queue. That
* is done by queueStart(). The reason is that we want to give the caller a chance
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis,
queueType_t qType,
int iWorkerThreads,
int iMaxQueueSize,
rsRetVal (*pConsumer)(void *, batch_t *, wti_t *)) {
DEFiRet;
qqueue_t *pThis;
const uchar *const workDir = glblGetWorkDirRaw(ourConf);
assert(ppThis != NULL);
assert(pConsumer != NULL);
assert(iWorkerThreads >= 0);
CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t)));
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
if (workDir != NULL) {
if ((pThis->pszSpoolDir = ustrdup(workDir)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
}
/* set some water marks so that we have useful defaults if none are set specifically */
pThis->iFullDlyMrk = -1;
pThis->iLightDlyMrk = -1;
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
pThis->nLogDeq = 0;
pThis->useCryprov = 0;
pThis->takeFlowCtlFromMsg = 0;
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */
pThis->iMinDeqBatchSize = 0; /* conservative default, should still provide good performance */
pThis->isRunning = 0;
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
RETiRet;
}
/* set default inside queue object suitable for action queues.
* This shall be called directly after queue construction. This functions has
* been added in support of the new v6 config system. It expect properly pre-initialized
* objects, but we need to differentiate between ruleset main and action queues.
* In order to avoid unnecessary complexity, we provide the necessary defaults
* via specific function calls.
*/
void qqueueSetDefaultsActionQueue(qqueue_t *pThis) {
pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */
pThis->iMaxQueueSize = 1000; /* size of the main message queue above */
pThis->iDeqBatchSize = 128; /* default batch size */
pThis->iMinDeqBatchSize = 0;
pThis->toMinDeqBatchSize = 1000;
pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
pThis->iDiscardMrk = -1; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
pThis->iMaxFileSize = 1024 * 1024;
pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
pThis->bSyncQueueFiles = 0;
pThis->toQShutdown = loadConf->globals.actq_dflt_toQShutdown; /* queue shutdown */
pThis->toActShutdown = loadConf->globals.actq_dflt_toActShutdown; /* action shutdown (in phase 2) */
pThis->toEnq = loadConf->globals.actq_dflt_toEnq; /* timeout for queue enque */
pThis->toWrkShutdown = loadConf->globals.actq_dflt_toWrkShutdown; /* timeout for worker thread shutdown */
pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */
pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
pThis->sizeOnDiskMax = 0; /* unlimited */
pThis->iDeqSlowdown = 0;
pThis->iDeqtWinFromHr = 0;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
pThis->iSmpInterval = 0; /* disable sampling */
}
/* set defaults inside queue object suitable for main/ruleset queues.
* See queueSetDefaultsActionQueue() for more details and background.
*/
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) {
pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */
pThis->iMaxQueueSize = 50000; /* size of the main message queue above */
pThis->iDeqBatchSize = 1024; /* default batch size */
pThis->iMinDeqBatchSize = 0;
pThis->toMinDeqBatchSize = 1000;
pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
pThis->iDiscardMrk = -1; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
pThis->iMaxFileSize = 16 * 1024 * 1024;
pThis->iPersistUpdCnt = 0; /* persist queue info every n updates */
pThis->bSyncQueueFiles = 0;
pThis->toQShutdown = ourConf->globals.ruleset_dflt_toQShutdown;
pThis->toActShutdown = ourConf->globals.ruleset_dflt_toActShutdown;
pThis->toEnq = ourConf->globals.ruleset_dflt_toEnq;
pThis->toWrkShutdown = ourConf->globals.ruleset_dflt_toWrkShutdown;
pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */
pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */
pThis->sizeOnDiskMax = 0; /* unlimited */
pThis->iDeqSlowdown = 0;
pThis->iDeqtWinFromHr = 0;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
pThis->iSmpInterval = 0; /* disable sampling */
}
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
* Note: cached copies of iQueueSize is provided so that no mutex locks are required.
* The caller must have obtained them while the mutex was locked. Of course, these values may no
* longer be current, but that is OK for the discard check. At worst, the message is either processed
* or discarded when it should not have been. As discarding is in itself somewhat racy and erratic,
* that is no problems for us. This function MUST NOT lock the queue mutex, it could result in
* deadlocks!
* If the message is discarded, it can no longer be processed by the caller. So be sure to check
* the return state!
* rgerhards, 2008-01-24
*/
static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, smsg_t *pMsg) {
DEFiRet;
rsRetVal iRetLocal;
int iSeverity;
ISOBJ_TYPE_assert(pThis, qqueue);
if (pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = MsgGetSeverity(pMsg, &iSeverity);
if (iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
DBGOPRINT((obj_t *)pThis, "queue nearly full (%d entries), discarded severity %d message\n", iQueueSize,
iSeverity);
STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
DBGOPRINT((obj_t *)pThis,
"queue nearly full (%d entries), but could not drop msg "
"(iRet: %d, severity %d)\n",
iQueueSize, iRetLocal, iSeverity);
}
}
finalize_it:
RETiRet;
}
/* Finally remove n elements from the queue store.
*/
static rsRetVal ATTR_NONNULL(1) DoDeleteBatchFromQStore(qqueue_t *const pThis, const int nElem) {
int i;
off64_t bytesDel = 0; /* keep CLANG static anaylzer happy */
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
/* now send delete request to storage driver */
if (pThis->qType == QUEUETYPE_DISK) {
strmMultiFileSeek(pThis->tVars.disk.pReadDel, pThis->tVars.disk.deqFileNumOut, pThis->tVars.disk.deqOffs,
&bytesDel);
/* We need to correct the on-disk file size. This time it is a bit tricky:
* we free disk space only upon file deletion. So we need to keep track of what we
* have read until we get an out-offset that is lower than the in-offset (which
* indicates file change). Then, we can subtract the whole thing from the on-disk
* size. -- rgerhards, 2008-01-30
*/
if (bytesDel != 0) {
pThis->tVars.disk.sizeOnDisk -= bytesDel;
DBGOPRINT((obj_t *)pThis,
"doDeleteBatch: a %lld octet file has been deleted, now %lld "
"octets disk space used\n",
(long long)bytesDel, pThis->tVars.disk.sizeOnDisk);
/* awake possibly waiting enq process */
pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
}
} else { /* memory queue */
for (i = 0; i < nElem; ++i) {
pThis->qDel(pThis);
}
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize);
#ifdef ENABLE_IMDIAG
#ifdef HAVE_ATOMIC_BUILTINS
/* mutex is never used due to conditional compilation */
ATOMIC_SUB(&iOverallQueueSize, nElem, &NULL);
#else
iOverallQueueSize -= nElem; /* racy, but we can't wait for a mutex! */
#endif
#endif
ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq);
DBGPRINTF("doDeleteBatch: delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis),
getPhysicalQueueSize(pThis));
++pThis->deqIDDel; /* one more batch dequeued */
if ((pThis->qType == QUEUETYPE_DISK) && (bytesDel != 0)) {
qqueuePersist(pThis, QUEUE_CHECKPOINT); /* robustness persist .qi file */
}
RETiRet;
}
typedef enum tdlPhase_e { TDL_EMPTY, TDL_PROCESS_HEAD, TDL_QUEUE } tdlPhase_t;
/**
* Remove messages from the physical queue store that are fully processed.
*
* Deletion proceeds through a small state machine governed by the
* to-delete list:
* - TDL_EMPTY: list is empty, delete the current batch directly.
* - TDL_PROCESS_HEAD: pending head elements are removed first, then the
* current batch.
* - TDL_QUEUE: current batch cannot be deleted and is queued for later.
*
* The dequeue identifier advances strictly monotonically, ensuring
* deterministic order and proper resource release for both disk and
* memory queue implementations.
*/
static rsRetVal DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) {
toDeleteLst_t *pTdl;
qDeqID nextID;
tdlPhase_t phase;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
dbgprintf("rger: deleteBatchFromQStore, nElem %d\n", (int)pBatch->nElem);
pTdl = tdlPeek(pThis); /* get current head element */
if (pTdl == NULL) {
phase = TDL_EMPTY;
} else if (pBatch->deqID == pThis->deqIDDel) {
phase = TDL_PROCESS_HEAD;
} else {
phase = TDL_QUEUE;
}
switch (phase) {
case TDL_EMPTY:
DoDeleteBatchFromQStore(pThis, pBatch->nElem);
break;
case TDL_PROCESS_HEAD:
nextID = pThis->deqIDDel;
while ((pTdl = tdlPeek(pThis)) != NULL && pTdl->deqID == nextID) {
DoDeleteBatchFromQStore(pThis, pTdl->nElemDeq);
tdlPop(pThis);
++nextID;
}
assert(pThis->deqIDDel == nextID);
/* old entries deleted, now delete current ones... */
DoDeleteBatchFromQStore(pThis, pBatch->nElem);
break;
case TDL_QUEUE:
/* cannot delete, insert into to-delete list */
DBGPRINTF("not at head of to-delete list, enqueue %d\n", (int)pBatch->deqID);
CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
break;
default:
/* all phases should be handled above. */
assert(0 && "unhandled tdlPhase_t");
break;
}
finalize_it:
RETiRet;
}
/* Delete a batch of processed user objects from the queue, which includes
* destructing the objects themself. Any entries not marked as finally
* processed are enqueued again. The new enqueue is necessary because we have a
* rgerhards, 2009-05-13
*/
static rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) {
int i;
smsg_t *pMsg;
int nEnqueued = 0;
rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pBatch != NULL);
for (i = 0; i < pBatch->nElem; ++i) {
pMsg = pBatch->pElem[i].pMsg;
DBGPRINTF("DeleteProcessedBatch: etry %d state %d\n", i, pBatch->eltState[i]);
if (pBatch->eltState[i] == BATCH_STATE_RDY || pBatch->eltState[i] == BATCH_STATE_SUB) {
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
++nEnqueued;
if (localRet != RS_RET_OK) {
DBGPRINTF(
"DeleteProcessedBatch: error %d re-enqueuing unprocessed "
"data element - discarded\n",
localRet);
}
}
msgDestruct(&pMsg);
}
DBGPRINTF("DeleteProcessedBatch: we deleted %d objects and enqueued %d objects\n", i - nEnqueued, nEnqueued);
if (nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued);
iRet = DeleteBatchFromQStore(pThis, pBatch);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14
RETiRet;
}
/* dequeue as many user pointers as are available, until we hit the configured
* upper limit of pointers. Note that this function also deletes all processed
* objects from the previous batch. However, it is perfectly valid that the
* previous batch contained NO objects at all. For example, this happens
* immediately after system startup or when a queue was exhausted and the queue
* worker needed to wait for new data.
* This must only be called when the queue mutex is LOOKED, otherwise serious
* malfunction will happen.
*/
static rsRetVal ATTR_NONNULL() DequeueConsumableElements(qqueue_t *const pThis,
wti_t *const pWti,
int *const piRemainingQueueSize,
int *const pSkippedMsgs) {
int nDequeued;
int nDiscarded;
int nDeleted;
int iQueueSize;
int keep_running = 1;
struct timespec timeout;
smsg_t *pMsg;
rsRetVal localRet;
DEFiRet;
nDeleted = pWti->batch.nElemDeq;
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
if (pThis->qType == QUEUETYPE_DISK) {
pThis->tVars.disk.deqFileNumIn = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
}
/* work-around clang static analyzer false positive, we need a const value */
const int iMinDeqBatchSize = pThis->iMinDeqBatchSize;
if (iMinDeqBatchSize > 0) {
timeoutComp(&timeout, pThis->toMinDeqBatchSize); /* get absolute timeout */
}
while ((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
int rd_fd = -1;
int64_t rd_offs = 0;
int wr_fd = -1;
int64_t wr_offs = 0;
if (pThis->tVars.disk.pReadDeq != NULL) {
rd_fd = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
rd_offs = pThis->tVars.disk.pReadDeq->iCurrOffs;
}
if (pThis->tVars.disk.pWrite != NULL) {
wr_fd = strmGetCurrFileNum(pThis->tVars.disk.pWrite);
wr_offs = pThis->tVars.disk.pWrite->iCurrOffs;
}
if (rd_fd != -1 && rd_fd == wr_fd && rd_offs == wr_offs) {
DBGPRINTF(
"problem on disk queue '%s': "
//"queue size log %d, phys %d, but rd_fd=wr_rd=%d and offs=%lld\n",
"queue size log %d, phys %d, but rd_fd=wr_rd=%d and offs=%" PRId64 "\n",
obj.GetName((obj_t *)pThis), iQueueSize, pThis->iQueueSize, rd_fd, rd_offs);
*pSkippedMsgs = iQueueSize;
#ifdef ENABLE_IMDIAG
iOverallQueueSize -= iQueueSize;
#endif
pThis->iQueueSize -= iQueueSize;
iQueueSize = 0;
break;
}
localRet = qqueueDeq(pThis, &pMsg);
if (localRet == RS_RET_FILE_NOT_FOUND) {
DBGPRINTF(
"fatal error on disk queue '%s': file '%s' "
"not found, queue size said to be %d",
obj.GetName((obj_t *)pThis), "...", iQueueSize);
}
CHKiRet(localRet);
/* check if we should discard this element */
localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg);
if (localRet == RS_RET_QUEUE_FULL) {
++nDiscarded;
continue;
} else if (localRet != RS_RET_OK) {
ABORT_FINALIZE(localRet);
}
/* all well, use this element */
pWti->batch.pElem[nDequeued].pMsg = pMsg;
pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY;
++nDequeued;
if (nDequeued < iMinDeqBatchSize && getLogicalQueueSize(pThis) == 0) {
while (!pThis->bShutdownImmediate && keep_running && nDequeued < iMinDeqBatchSize &&
getLogicalQueueSize(pThis) == 0) {
dbgprintf(
"%s minDeqBatchSize doing wait, batch is %d messages, "
"queue size %d\n",
obj.GetName((obj_t *)pThis), nDequeued, getLogicalQueueSize(pThis));
if (wtiWaitNonEmpty(pWti, timeout) == 0) { /* timeout? */
DBGPRINTF("%s minDeqBatchSize timeout, batch is %d messages\n", obj.GetName((obj_t *)pThis),
nDequeued);
keep_running = 0;
}
}
}
if (keep_running) {
keep_running = (getLogicalQueueSize(pThis) > 0) && (nDequeued < pThis->iDeqBatchSize);
}
}
if (pThis->qType == QUEUETYPE_DISK) {
strm.GetCurrOffset(pThis->tVars.disk.pReadDeq, &pThis->tVars.disk.deqOffs);
pThis->tVars.disk.deqFileNumOut = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
}
/* it is sufficient to persist only when the bulk of work is done */
qqueueChkPersist(pThis, nDequeued + nDiscarded + nDeleted);
/* If messages where DISCARDED, we need to substract them from the OverallQueueSize */
#ifdef ENABLE_IMDIAG
#ifdef HAVE_ATOMIC_BUILTINS
ATOMIC_SUB(&iOverallQueueSize, nDiscarded, &NULL);
#else
iOverallQueueSize -= nDiscarded; /* racy, but we can't wait for a mutex! */
#endif
DBGOPRINT((obj_t *)pThis, "dequeued %d discarded %d QueueSize %d consumable elements, szlog %d sz phys %d\n",
nDequeued, nDiscarded, iOverallQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
#else
DBGOPRINT((obj_t *)pThis, "dequeued %d discarded %d consumable elements, szlog %d sz phys %d\n", nDequeued,
nDiscarded, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
#endif
pWti->batch.nElem = nDequeued;
pWti->batch.nElemDeq = nDequeued + nDiscarded;
pWti->batch.deqID = getNextDeqID(pThis);
*piRemainingQueueSize = iQueueSize;
finalize_it:
RETiRet;
}
/* dequeue the queued object for the queue consumers.
* rgerhards, 2008-10-21
* I made a radical change - we now dequeue multiple elements, and store these objects in
* an array of user pointers. We expect that this increases performance.
* rgerhards, 2009-04-22
*/
static rsRetVal DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int *const pSkippedMsgs) {
DEFiRet;
int iQueueSize = 0; /* keep the compiler happy... */
*pSkippedMsgs = 0;
/* dequeue element batch (still protected from mutex) */
iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize, pSkippedMsgs);
if (*pSkippedMsgs > 0) {
LogError(0, RS_RET_ERR, "%s: lost %d messages from diskqueue (invalid .qi file)", obj.GetName((obj_t *)pThis),
*pSkippedMsgs);
}
/* awake some flow-controlled sources if we can do this right now */
/* TODO: this could be done better from a performance point of view -- do it only if
* we have someone waiting for the condition (or only when we hit the watermark right
* on the nail [exact value]) -- rgerhards, 2008-03-14
* now that we dequeue batches of pointers, this is much less an issue...
* rgerhards, 2009-04-22
*/
if (iQueueSize < pThis->iFullDlyMrk / 2 || glbl.GetGlobalInputTermState() == 1) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
}
if (iQueueSize < pThis->iLightDlyMrk / 2) {
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
pthread_cond_signal(&pThis->notFull);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
if (iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
LogError(0, iRet,
"%s: error dequeueing element - ignoring, "
"but strange things may happen",
obj.GetName((obj_t *)pThis));
}
RETiRet;
}
/* The rate limiter
*
* IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when
* it actually delays processing. Otherwise inputs are stalled.
*
* Here we may wait if a dequeue time window is defined or if we are
* rate-limited. TODO: If we do so, we should also look into the
* way new worker threads are spawned. Obviously, it doesn't make much
* sense to spawn additional worker threads when none of them can do any
* processing. However, it is deemed acceptable to allow this for an initial
* implementation of the timeframe/rate limiting feature.
* Please also note that these feature could also be implemented at the action
* level. However, that would limit them to be used together with actions. We have
* taken the broader approach, moving it right into the queue. This is even
* necessary if we want to prevent spawning of multiple unnecessary worker
* threads as described above. -- rgerhards, 2008-04-02
*
*
* time window: tCurr is current time; tFrom is start time, tTo is end time (in mil 24h format).
* We may have tFrom = 4, tTo = 10 --> run from 4 to 10 hrs. nice and happy
* we may also have tFrom= 22, tTo = 4 -> run from 10pm to 4am, which is actually two
* windows: 0-4; 22-23:59
* so when to run? Let's assume we have 3am
*
* if(tTo < tFrom) {
* if(tCurr < tTo [3 < 4] || tCurr > tFrom [3 > 22])
* do work
* else
* sleep for tFrom - tCurr "hours" [22 - 5 --> 17]
* } else {
* if(tCurr >= tFrom [3 >= 4] && tCurr < tTo [3 < 10])
* do work
* else
* sleep for tTo - tCurr "hours" [4 - 3 --> 1]
* }
*
* Bottom line: we need to check which type of window we have and need to adjust our
* logic accordingly. Of course, sleep calculations need to be done up to the minute,
* but you get the idea from the code above.
*/
static rsRetVal RateLimiter(qqueue_t *pThis) {
DEFiRet;
int iDelay;
int iHrCurr;
time_t tCurr;
struct tm m;
ISOBJ_TYPE_assert(pThis, qqueue);
iDelay = 0;
if (pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
/* time calls are expensive, so only do them when needed */
datetime.GetTime(&tCurr);
localtime_r(&tCurr, &m);
iHrCurr = m.tm_hour;
if (pThis->iDeqtWinToHr < pThis->iDeqtWinFromHr) {
if (iHrCurr < pThis->iDeqtWinToHr || iHrCurr > pThis->iDeqtWinFromHr) {
; /* do not delay */
} else {
iDelay = (pThis->iDeqtWinFromHr - iHrCurr) * 3600;
/* this time, we are already into the next hour, so we need
* to subtract our current minute and seconds.
*/
iDelay -= m.tm_min * 60;
iDelay -= m.tm_sec;
}
} else {
if (iHrCurr >= pThis->iDeqtWinFromHr && iHrCurr < pThis->iDeqtWinToHr) {
; /* do not delay */
} else {
if (iHrCurr < pThis->iDeqtWinFromHr) {
iDelay = (pThis->iDeqtWinFromHr - iHrCurr - 1) * 3600;
/* -1 as we are already in the hour */
iDelay += (60 - m.tm_min) * 60;
iDelay += 60 - m.tm_sec;
} else {
iDelay = (24 - iHrCurr + pThis->iDeqtWinFromHr) * 3600;
/* this time, we are already into the next hour, so we need
* to subtract our current minute and seconds.
*/
iDelay -= m.tm_min * 60;
iDelay -= m.tm_sec;
}
}
}
}
if (iDelay > 0) {
pthread_mutex_unlock(pThis->mut);
DBGOPRINT((obj_t *)pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
srSleep(iDelay, 0);
pthread_mutex_lock(pThis->mut);
}
RETiRet;
}
/* This dequeues the next batch. Note that this function must not be
* cancelled, else it will leave back an inconsistent state.
* rgerhards, 2009-05-20
*/
static rsRetVal DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int *const pSkippedMsgs) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, pSkippedMsgs));
if (pWti->batch.nElem == 0) ABORT_FINALIZE(RS_RET_IDLE);
finalize_it:
RETiRet;
}
/* This is called when a batch is processed and the worker does not
* ask for another batch (e.g. because it is to be terminated)
* Note that we must not be terminated while we delete a processed
* batch. Otherwise, we may not complete it, and then the cancel
* handler also tries to delete the batch. But then it finds some of
* the messages already destructed. This was a bug we have seen, especially
* with disk mode, where a delete takes rather long. Anyhow, the coneptual
* problem exists in all queue modes.
* rgerhards, 2009-05-27
*/
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
int iCancelStateSave;
/* at this spot, we must not be cancelled */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
DeleteProcessedBatch(pThis, &pWti->batch);
qqueueChkPersist(pThis, pWti->batch.nElemDeq);
pthread_setcancelstate(iCancelStateSave, NULL);
RETiRet;
}
/* This is the queue consumer in the regular (non-DA) case. It is
* protected by the queue mutex, but MUST release it as soon as possible.
* rgerhards, 2008-01-21
*/
static rsRetVal ConsumerReg(qqueue_t *pThis, wti_t *pWti) {
int iCancelStateSave;
int bNeedReLock = 0; /**< do we need to lock the mutex again? */
int skippedMsgs = 0; /**< did the queue loose any messages (can happen with
** disk queue if .qi file is corrupt */
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
iRet = DequeueForConsumer(pThis, pWti, &skippedMsgs);
if (iRet == RS_RET_FILE_NOT_FOUND) {
/* This is a fatal condition and means the queue is almost unusable */
d_pthread_mutex_unlock(pThis->mut);
DBGOPRINT((obj_t *)pThis, "got 'file not found' error %d, queue defunct\n", iRet);
iRet = queueSwitchToEmergencyMode(pThis, iRet);
// TODO: think about what to return as iRet -- keep RS_RET_FILE_NOT_FOUND?
d_pthread_mutex_lock(pThis->mut);
}
if (iRet != RS_RET_OK) {
FINALIZE;
}
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
bNeedReLock = 1;
/* report errors, now that we are outside of queue lock */
if (skippedMsgs > 0) {
LogError(0, 0,
"problem on disk queue '%s': "
"queue files contain %d messages fewer than specified "
"in .qi file -- we lost those messages. That's all we know.",
obj.GetName((obj_t *)pThis), skippedMsgs);
}
/* at this spot, we may be cancelled */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
if (pThis->iDeqSlowdown) {
DBGOPRINT((obj_t *)pThis, "sleeping %d microseconds as requested by config params\n", pThis->iDeqSlowdown);
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
/* but now cancellation is no longer permitted */
pthread_setcancelstate(iCancelStateSave, NULL);
finalize_it:
DBGPRINTF("regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis),
getPhysicalQueueSize(pThis));
/* now we are done, but potentially need to re-acquire the mutex */
if (bNeedReLock) d_pthread_mutex_lock(pThis->mut);
RETiRet;
}
/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
* the app point of view (we chain two queues here).
* When this method is entered, the mutex is always locked and needs to be unlocked
* as part of the processing.
* rgerhards, 2008-01-14
*/
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti) {
int i;
int iCancelStateSave;
int bNeedReLock = 0; /**< do we need to lock the mutex again? */
int skippedMsgs = 0;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueForConsumer(pThis, pWti, &skippedMsgs));
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
bNeedReLock = 1;
/* at this spot, we may be cancelled */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
/* iterate over returned results and enqueue them in DA queue */
for (i = 0; i < pWti->batch.nElem && !pThis->bShutdownImmediate; i++) {
iRet = qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg));
if (iRet != RS_RET_OK) {
if (iRet == RS_RET_ERR_QUEUE_EMERGENCY) {
/* Queue emergency error occurred */
DBGOPRINT((obj_t *)pThis,
"ConsumerDA:qqueueEnqMsg caught RS_RET_ERR_QUEUE_EMERGENCY,"
"aborting loop.\n");
FINALIZE;
} else {
DBGOPRINT((obj_t *)pThis,
"ConsumerDA:qqueueEnqMsg item (%d) returned "
"with error state: '%d'\n",
i, iRet);
}
}
pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */
}
/* but now cancellation is no longer permitted */
pthread_setcancelstate(iCancelStateSave, NULL);
finalize_it:
/* Check the last return state of qqueueEnqMsg. If an error was returned, we acknowledge it only.
* Unless the error code is RS_RET_ERR_QUEUE_EMERGENCY, we reset the return state to RS_RET_OK.
* Otherwise the Caller functions would run into an infinite Loop trying to enqueue the
* same messages over and over again.
*
* However we do NOT overwrite positive return states like
* RS_RET_TERMINATE_NOW,
* RS_RET_NO_RUN,
* RS_RET_IDLE,
* RS_RET_TERMINATE_WHEN_IDLE
* These return states are important for Queue handling of the upper laying functions.
* RGer: Note that checking for iRet < 0 is a bit bold. In theory, positive iRet
* values are "OK" states, and things that the caller shall deal with. However,
* this has not been done so consistently. Andre convinced me that the current
* code is an elegant solution. However, if problems with queue workers and/or
* shutdown come up, this code here should be looked at suspiciously. In those
* cases it may work out to check all status codes explicitely, just to avoid
* a pitfall due to unexpected states being passed on to the caller.
*/
if (iRet != RS_RET_OK && iRet != RS_RET_ERR_QUEUE_EMERGENCY && iRet < 0) {
DBGOPRINT((obj_t *)pThis, "ConsumerDA:qqueueEnqMsg Resetting iRet from %d back to RS_RET_OK\n", iRet);
iRet = RS_RET_OK;
} else {
DBGOPRINT((obj_t *)pThis, "ConsumerDA:qqueueEnqMsg returns with iRet %d\n", iRet);
}
/* now we are done, but potentially need to re-acquire the mutex */
if (bNeedReLock) d_pthread_mutex_lock(pThis->mut);
RETiRet;
}
/* must only be called when the queue mutex is locked, else results
* are not stable!
*/
static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis) {
DEFiRet;
DBGPRINTF("rger: chkStopWrkrDA called, low watermark %d, log Size %d, phys Size %d, bEnqOnly %d\n",
pThis->iLowWtrMrk, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), pThis->bEnqOnly);
if (pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
}
if (getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk) {
iRet = RS_RET_TERMINATE_NOW;
}
RETiRet;
}
/* must only be called when the queue mutex is locked, else results
* are not stable!
* If we are a child, we have done our duty when the queue is empty. In that case,
* we can terminate. Version for the regular worker thread.
*/
static rsRetVal ChkStopWrkrReg(qqueue_t *pThis) {
DEFiRet;
/*DBGPRINTF("XXXX: chkStopWrkrReg called, low watermark %d, log Size %d, phys Size %d, bEnqOnly %d\n",
pThis->iLowWtrMrk, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), pThis->bEnqOnly);*/
if (pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_NOW;
} else if (pThis->pqParent != NULL) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
}
RETiRet;
}
/* return the configured "deq max at once" interval
* rgerhards, 2009-04-22
*/
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal) {
DEFiRet;
assert(pVal != NULL);
*pVal = pThis->iDeqBatchSize;
RETiRet;
}
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
rsRetVal qqueueStart(rsconf_t *cnf, qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
uchar pszBuf[64];
uchar pszQIFNam[MAXFNAME];
int wrk;
uchar *qName;
size_t lenBuf;
assert(pThis != NULL);
/* do not modify the queue if it's already running(happens when dynamic config reload is invoked
* and the queue is used in the new config as well)
*/
if (pThis->isRunning) FINALIZE;
dbgoprint((obj_t *)pThis, "starting queue\n");
if (pThis->pszSpoolDir == NULL) {
/* note: we need to pick the path so late as we do not have
* the workdir during early config load
*/
if ((pThis->pszSpoolDir = (uchar *)strdup((char *)glbl.GetWorkDir(cnf))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
}
/* set type-specific handlers and other very type-specific things
* (we can not totally hide it...)
*/
switch (pThis->qType) {
case QUEUETYPE_FIXED_ARRAY:
pThis->qConstruct = qConstructFixedArray;
pThis->qDestruct = qDestructFixedArray;
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
pThis->qDeq = qDeqLinkedList;
pThis->qDel = qDelLinkedList;
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
pThis->qDestruct = qDestructDisk;
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = NULL; /* delete for disk handled via special code! */
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* pre-construct file name for .qi file */
pThis->lenQIFNam = snprintf((char *)pszQIFNam, sizeof(pszQIFNam), "%s/%s.qi", (char *)pThis->pszSpoolDir,
(char *)pThis->pszFilePrefix);
pThis->pszQIFNam = ustrdup(pszQIFNam);
DBGOPRINT((obj_t *)pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam, (int)pThis->lenQIFNam);
break;
case QUEUETYPE_DIRECT:
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
/* these entry points shall not be used in direct mode
* To catch program errors, make us abort if that happens!
* rgerhards, 2013-11-05
*/
pThis->qAdd = qAddDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
pThis->qDel = NULL;
break;
default:
// We need to satisfy compiler which does not properly handle enum
break;
}
/* finalize some initializations that could not yet be done because it is
* influenced by properties which might have been set after queueConstruct ()
*/
if (pThis->pqParent == NULL) {
CHKmalloc(pThis->mut = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t)));
pthread_mutex_init(pThis->mut, NULL);
} else {
/* child queue, we need to use parent's mutex */
DBGOPRINT((obj_t *)pThis, "I am a child\n");
pThis->mut = pThis->pqParent->mut;
}
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
pthread_cond_init(&pThis->notFull, NULL);
pthread_cond_init(&pThis->belowFullDlyWtrMrk, NULL);
pthread_cond_init(&pThis->belowLightDlyWtrMrk, NULL);
/* call type-specific constructor */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
/* re-adjust some params if required */
if (pThis->bIsDA) {
/* if we are in DA mode, we must make sure full delayable messages do not
* initiate going to disk!
*/
wrk = pThis->iHighWtrMrk - (pThis->iHighWtrMrk / 100) * 50; /* 50% of high water mark */
if (wrk < pThis->iFullDlyMrk) pThis->iFullDlyMrk = wrk;
}
DBGOPRINT((obj_t *)pThis,
"params: type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, "
"maxQSize %d, lqsize %d, pqsize %d, child %d, full delay %d, "
"light delay %d, deq batch size %d, min deq batch size %d, "
"high wtrmrk %d, low wtrmrk %d, "
"discardmrk %d, max wrkr %d, min msgs f. wrkr %d "
"takeFlowCtlFromMsg %d\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir, pThis->iMaxFileSize,
pThis->iMaxQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize,
pThis->iMinDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk, pThis->iDiscardMrk,
(int)pThis->iNumWorkerThreads, (int)pThis->iMinMsgsPerWrkr, pThis->takeFlowCtlFromMsg);
pThis->bQueueStarted = 1;
if (pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */
/* create worker thread pools for regular and DA operation.
*/
lenBuf = snprintf((char *)pszBuf, sizeof(pszBuf), "%.*s:Reg", (int)(sizeof(pszBuf) - 16),
obj.GetName((obj_t *)pThis)); /* leave some room inside the name for suffixes */
if (lenBuf >= sizeof(pszBuf)) {
LogError(0, RS_RET_INTERNAL_ERROR,
"%s:%d debug header too long: %zd - in "
"thory this cannot happen - truncating",
__FILE__, __LINE__, lenBuf);
lenBuf = sizeof(pszBuf) - 1;
pszBuf[lenBuf] = '\0';
}
CHKiRet(wtpConstruct(&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr(pThis->pWtpReg, pszBuf, lenBuf));
CHKiRet(wtpSetpfRateLimiter(pThis->pWtpReg, (rsRetVal(*)(void *pUsr))RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr(pThis->pWtpReg, (rsRetVal(*)(void *pUsr, int))ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize(pThis->pWtpReg, (rsRetVal(*)(void *pUsr, int *))GetDeqBatchSize));
CHKiRet(wtpSetpfDoWork(pThis->pWtpReg, (rsRetVal(*)(void *pUsr, void *pWti))ConsumerReg));
CHKiRet(wtpSetpfObjProcessed(pThis->pWtpReg, (rsRetVal(*)(void *pUsr, wti_t *pWti))batchProcessed));
CHKiRet(wtpSetpmutUsr(pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetiNumWorkerThreads(pThis->pWtpReg, pThis->iNumWorkerThreads));
CHKiRet(wtpSettoWrkShutdown(pThis->pWtpReg, pThis->toWrkShutdown));
CHKiRet(wtpSetpUsr(pThis->pWtpReg, pThis));
CHKiRet(wtpConstructFinalize(pThis->pWtpReg));
/* Validate queue configuration before starting */
if (pThis->qType == QUEUETYPE_DISK || pThis->bIsDA) {
/* Check that maxDiskSpace is not smaller than maxFileSize */
if (pThis->sizeOnDiskMax > 0 && pThis->iMaxFileSize > 0 && pThis->sizeOnDiskMax < pThis->iMaxFileSize) {
LogError(0, RS_RET_CONF_PARAM_INVLD,
"queue.maxDiskSpace (%lld) must be larger than queue.maxFileSize (%lld) - "
"setting queue.maxDiskSpace to %lld",
pThis->sizeOnDiskMax, pThis->iMaxFileSize, pThis->iMaxFileSize);
pThis->sizeOnDiskMax = pThis->iMaxFileSize;
}
}
/* set up DA system if we have a disk-assisted queue */
if (pThis->bIsDA) InitDA(pThis, LOCK_MUTEX); /* initiate DA mode */
DBGOPRINT((obj_t *)pThis, "queue finished initialization\n");
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
*/
qqueueAdviseMaxWorkers(pThis);
/* support statistics gathering */
qName = obj.GetName((obj_t *)pThis);
CHKiRet(statsobj.Construct(&pThis->statsobj));
CHKiRet(statsobj.SetName(pThis->statsobj, qName));
CHKiRet(statsobj.SetOrigin(pThis->statsobj, (uchar *)"core.queue"));
/* we need to save the queue size, as the stats module initializes it to 0! */
/* iQueueSize is a dual-use counter: no init, no mutex! */
CHKiRet(
statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("size"), ctrType_Int, CTR_FLAG_NONE, &pThis->iQueueSize));
STATSCOUNTER_INIT(pThis->ctrEnqueued, pThis->mutCtrEnqueued);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("enqueued"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
&pThis->ctrEnqueued));
STATSCOUNTER_INIT(pThis->ctrFull, pThis->mutCtrFull);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
&pThis->ctrFull));
STATSCOUNTER_INIT(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.full"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
&pThis->ctrFDscrd));
STATSCOUNTER_INIT(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.nf"), ctrType_IntCtr, CTR_FLAG_RESETTABLE,
&pThis->ctrNFDscrd));
pThis->ctrMaxqsize = 0; /* no mutex needed, thus no init call */
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"), ctrType_Int, CTR_FLAG_NONE,
&pThis->ctrMaxqsize));
CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
finalize_it:
if (iRet != RS_RET_OK) {
/* note: a child uses it's parent mutex, so do not delete it! */
if (pThis->pqParent == NULL && pThis->mut != NULL) free(pThis->mut);
} else {
pThis->isRunning = 1;
}
RETiRet;
}
/* persist the queue to disk (write the .qi file). If we have something to persist, we first
* save the information on the queue properties itself and then we call
* the queue-type specific drivers.
* Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint,
* and 0 otherwise.
* rgerhards, 2008-01-10
*/
static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) {
DEFiRet;
char *tmpQIFName = NULL;
strm_t *psQIF = NULL; /* Queue Info File */
char errStr[1024];
assert(pThis != NULL);
if (pThis->qType != QUEUETYPE_DISK) {
if (getPhysicalQueueSize(pThis) > 0) {
/* This error code is OK, but we will probably not implement this any time
* The reason is that persistence happens via DA queues. But I would like to
* leave the code as is, as we so have a hook in case we need one.
* -- rgerhards, 2008-01-28
*/
ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED);
} else
FINALIZE; /* if the queue is empty, we are happy and done... */
}
DBGOPRINT((obj_t *)pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis));
if ((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) {
if (pThis->bNeedDelQIF) {
unlink((char *)pThis->pszQIFNam);
pThis->bNeedDelQIF = 0;
}
/* indicate spool file needs to be deleted */
if (pThis->tVars.disk.pReadDel != NULL) /* may be NULL if we had a startup failure! */
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
FINALIZE; /* nothing left to do, so be happy */
}
int lentmpQIFName;
#ifdef _AIX
lentmpQIFName = strlen(pThis->pszQIFNam) + strlen(".tmp") + 1;
tmpQIFName = malloc(sizeof(char) * lentmpQIFName);
if (tmpQIFName == NULL) tmpQIFName = (char *)pThis->pszQIFNam;
snprintf(tmpQIFName, lentmpQIFName, "%s.tmp", pThis->pszQIFNam);
#else
lentmpQIFName = asprintf((char **)&tmpQIFName, "%s.tmp", pThis->pszQIFNam);
if (tmpQIFName == NULL) tmpQIFName = (char *)pThis->pszQIFNam;
#endif
CHKiRet(strm.Construct(&psQIF));
CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC));
CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles));
CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE));
CHKiRet(strm.SetFName(psQIF, (uchar *)tmpQIFName, lentmpQIFName));
CHKiRet(strm.ConstructFinalize(psQIF));
/* first, write the property bag for ourselfs
* And, surprisingly enough, we currently need to persist only the size of the
* queue. All the rest is re-created with then-current config parameters when the
* queue is re-created. Well, we'll also save the current queue type, just so that
* we know when somebody has changed the queue type... -- rgerhards, 2008-01-11
*/
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t *)pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
CHKiRet(obj.EndSerialize(psQIF));
/* now persist the stream info */
if (pThis->tVars.disk.pWrite != NULL) CHKiRet(strm.Serialize(pThis->tVars.disk.pWrite, psQIF));
if (pThis->tVars.disk.pReadDel != NULL) CHKiRet(strm.Serialize(pThis->tVars.disk.pReadDel, psQIF));
strm.Destruct(&psQIF);
if (tmpQIFName != (char *)pThis->pszQIFNam) { /* pointer, not string comparison! */
if (rename(tmpQIFName, (char *)pThis->pszQIFNam) != 0) {
rs_strerror_r(errno, errStr, sizeof(errStr));
DBGOPRINT((obj_t *)pThis, "FATAL error: renaming temporary .qi file failed: %s\n", errStr);
ABORT_FINALIZE(RS_RET_RENAME_TMP_QI_ERROR);
}
}
/* tell the input file object that it must not delete the file on close if the queue
* is non-empty - but only if we are not during a simple checkpoint
*/
if (bIsCheckpoint != QUEUE_CHECKPOINT && pThis->tVars.disk.pReadDel != NULL) {
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 0));
}
/* we have persisted the queue object. So whenever it comes to an empty queue,
* we need to delete the QIF. Thus, we indicte that need.
*/
pThis->bNeedDelQIF = 1;
finalize_it:
if (tmpQIFName != (char *)pThis->pszQIFNam) /* pointer, not string comparison! */
free(tmpQIFName);
if (psQIF != NULL) strm.Destruct(&psQIF);
RETiRet;
}
/* check if we need to persist the current queue info. If an
* error occurs, this should be ignored by caller (but we still
* abide to our regular call interface)...
* rgerhards, 2008-01-13
* nUpdates is the number of updates since the last call to this function.
* It may be > 1 due to batches. -- rgerhards, 2009-05-12
*/
static rsRetVal qqueueChkPersist(qqueue_t *const pThis, const int nUpdates) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(nUpdates >= 0);
if (nUpdates == 0) FINALIZE;
pThis->iUpdsSincePersist += nUpdates;
if (pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
qqueuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
finalize_it:
RETiRet;
}
/* persist a queue with all data elements to disk - this is used to handle
* bSaveOnShutdown. We utilize the DA worker to do this. This must only
* be called after all workers have been shut down and if bSaveOnShutdown
* is actually set. Note that this function may potentially run long,
* depending on the queue configuration (e.g. store on remote machine).
* rgerhards, 2009-05-26
*/
static rsRetVal DoSaveOnShutdown(qqueue_t *pThis) {
struct timespec tTimeout;
rsRetVal iRetLocal;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
/* we reduce the low water mark, otherwise the DA worker would terminate when
* it is reached.
*/
DBGOPRINT((obj_t *)pThis, "bSaveOnShutdown set, restarting DA worker...\n");
pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */
pThis->iLowWtrMrk = 0;
wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */
wtpAdviseMaxWorkers(pThis->pWtpDA, 1, PERMIT_WORKER_START_DURING_SHUTDOWN); /* restart DA worker */
DBGOPRINT((obj_t *)pThis, "waiting for DA worker to terminate...\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
DBGOPRINT((obj_t *)pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", iRetLocal,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
if (iRetLocal != RS_RET_OK) {
DBGOPRINT((obj_t *)pThis,
"unexpected iRet state %d after trying to shut down primary "
"queue in disk save mode, continuing, but results are unpredictable\n",
iRetLocal);
}
RETiRet;
}
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue);
DBGOPRINT((obj_t *)pThis, "shutdown: begin to destruct queue\n");
if (ourConf->globals.shutdownQueueDoubleSize) {
pThis->iHighWtrMrk *= 2;
pThis->iMaxQueueSize *= 2;
}
if (pThis->bQueueStarted) {
/* shut down all workers
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
* with a child! -- rgerhards, 2008-01-28
*/
if (pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL && pThis->pWtpReg != NULL)
qqueueShutdownWorkers(pThis);
if (pThis->bIsDA && getPhysicalQueueSize(pThis) > 0) {
if (pThis->bSaveOnShutdown) {
LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO,
"%s: queue holds %d messages after shutdown of workers. "
"queue.saveonshutdown is set, so data will now be spooled to disk",
objGetName((obj_t *)pThis), getPhysicalQueueSize(pThis));
CHKiRet(DoSaveOnShutdown(pThis));
} else {
LogMsg(0, RS_RET_TIMED_OUT, LOG_WARNING,
"%s: queue holds %d messages after shutdown of workers. "
"queue.saveonshutdown is NOT set, so data will be discarded.",
objGetName((obj_t *)pThis), getPhysicalQueueSize(pThis));
}
}
/* finally destruct our (regular) worker thread pool
* Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen,
* e.g. when they are not created in enqueue-only mode. We already check the condition
* as this may otherwise be very hard to find once we optimize (and have long forgotten
* about this condition here ;)
* rgerhards, 2008-01-25
*/
if (pThis->qType != QUEUETYPE_DIRECT && pThis->pWtpReg != NULL) {
wtpDestruct(&pThis->pWtpReg);
}
/* Now check if we actually have a DA queue and, if so, destruct it.
* Note that the wtp must be destructed first, it may be in cancel cleanup handler
* *right now* and actually *need* to access the queue object to persist some final
* data (re-queueing case). So we need to destruct the wtp first, which will make
* sure all workers have terminated. Please note that this also generates a situation
* where it is possible that the DA queue has a parent pointer but the parent has
* no WtpDA associated with it - which is perfectly legal thanks to this code here.
*/
if (pThis->pWtpDA != NULL) {
wtpDestruct(&pThis->pWtpDA);
}
if (pThis->pqDA != NULL) {
qqueueDestruct(&pThis->pqDA);
}
/* persist the queue (we always do that - queuePersits() does cleanup if the queue is empty)
* This handler is most important for disk queues, it will finally persist the necessary
* on-disk structures. In theory, other queueing modes may implement their other (non-DA)
* methods of persisting a queue between runs, but in practice all of this is done via
* disk queues and DA mode. Anyhow, it doesn't hurt to know that we could extend it here
* if need arises (what I doubt...) -- rgerhards, 2008-01-25
*/
CHKiRet_Hdlr(qqueuePersist(pThis, QUEUE_NO_CHECKPOINT)) {
DBGOPRINT((obj_t *)pThis, "error %d persisting queue - data lost!\n", iRet);
}
/* finally, clean up some simple things... */
if (pThis->pqParent == NULL) {
/* if we are not a child, we allocated our own mutex, which we now need to destroy */
pthread_mutex_destroy(pThis->mut);
free(pThis->mut);
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
pthread_cond_destroy(&pThis->notFull);
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq);
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
}
free(pThis->pszFilePrefix);
free(pThis->pszSpoolDir);
if (pThis->useCryprov) {
pThis->cryprov.Destruct(&pThis->cryprovData);
obj.ReleaseObj(__FILE__, pThis->cryprovNameFull + 2, pThis->cryprovNameFull, (void *)&pThis->cryprov);
free(pThis->cryprovName);
free(pThis->cryprovNameFull);
}
/* some queues do not provide stats and thus have no statsobj! */
if (pThis->statsobj != NULL) statsobj.Destruct(&pThis->statsobj);
ENDobjDestruct(qqueue)
/* set the queue's spool directory. The directory MUST NOT be NULL.
* The passed-in string is duplicated. So if the caller does not need
* it any longer, it must free it.
*/
rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir) {
DEFiRet;
free(pThis->pszSpoolDir);
CHKmalloc(pThis->pszSpoolDir = ustrdup(pszSpoolDir));
pThis->lenSpoolDir = lenSpoolDir;
finalize_it:
RETiRet;
}
/* set the queue's file prefix
* The passed-in string is duplicated. So if the caller does not need
* it any longer, it must free it.
* rgerhards, 2008-01-09
*/
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) {
DEFiRet;
free(pThis->pszFilePrefix);
pThis->pszFilePrefix = NULL;
if (pszPrefix == NULL) /* just unset the prefix! */
ABORT_FINALIZE(RS_RET_OK);
if ((pThis->pszFilePrefix = malloc(iLenPrefix + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1);
pThis->lenFilePrefix = iLenPrefix;
finalize_it:
RETiRet;
}
/* set the queue's maximum file size
* rgerhards, 2008-01-09
*/
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
if (iMaxFileSize < 1024) {
ABORT_FINALIZE(RS_RET_VALUE_TOO_LOW);
}
pThis->iMaxFileSize = iMaxFileSize;
finalize_it:
RETiRet;
}
/* enqueue a single data object.
* Note that the queue mutex MUST already be locked when this function is called.
* rgerhards, 2009-06-16
*/
static rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, smsg_t *pMsg) {
DEFiRet;
int err;
struct timespec t;
STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued);
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg));
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
* Basic flow control has always been implemented and protects the queue structures
* in that it makes sure no more data is enqueued than the queue is configured to
* support. Enhanced flow control is being added today. There are some sources which
* can easily be stopped, e.g. a file reader. This is the case because it is unlikely
* that blocking those sources will have negative effects (after all, the file is
* continued to be written). Other sources can somewhat be blocked (e.g. the kernel
* log reader or the local log stream reader): in general, nothing is lost if messages
* from these sources are not picked up immediately. HOWEVER, they can not block for
* an extended period of time, as this either causes message loss or - even worse - some
* other bad effects (e.g. unresponsive system in respect to the main system log socket).
* Finally, there are some (few) sources which can not be blocked at all. UDP syslog is
* a prime example. If a UDP message is not received, it is simply lost. So we can't
* do anything against UDP sockets that come in too fast. The core idea of advanced
* flow control is that we take into account the different natures of the sources and
* select flow control mechanisms that fit these needs. This also means, in the end
* result, that non-blockable sources like UDP syslog receive priority in the system.
* It's a side effect, but a good one ;) -- rgerhards, 2008-03-14
*/
if (unlikely(pThis->takeFlowCtlFromMsg)) { /* recommendation is NOT to use this option */
flowCtlType = pMsg->flowCtlType;
}
if (flowCtlType == eFLOWCTL_FULL_DELAY) {
while (pThis->iQueueSize >= pThis->iFullDlyMrk && !glbl.GetGlobalInputTermState()) {
/* We have a problem during shutdown if we block eternally. In that
* case, the the input thread cannot be terminated. So we wake up
* from time to time to check for termination.
* TODO/v6(at earliest): check if we could signal the condition during
* shutdown. However, this requires new queue registries and thus is
* far to much change for a stable version (and I am still not sure it
* is worth the effort, given how seldom this situation occurs and how
* few resources the wakeups need). -- rgerhards, 2012-05-03
* In any case, this was the old code (if we do the TODO):
* pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut);
*/
DBGOPRINT((obj_t *)pThis,
"doEnqSingleObject: FullDelay mark reached for full "
"delayable message - blocking, queue size is %d.\n",
pThis->iQueueSize);
timeoutComp(&t, 1000);
err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t);
if (err != 0 && err != ETIMEDOUT) {
/* Something is really wrong now. Report to debug log and abort the
* wait. That keeps us running, even though we may lose messages.
*/
DBGOPRINT((obj_t *)pThis,
"potential program bug: pthread_cond_timedwait()"
"/fulldelay returned %d\n",
err);
break;
}
DBGPRINTF("wti worker in full delay timed out, checking termination...\n");
}
} else if (flowCtlType == eFLOWCTL_LIGHT_DELAY && !glbl.GetGlobalInputTermState()) {
if (pThis->iQueueSize >= pThis->iLightDlyMrk) {
DBGOPRINT((obj_t *)pThis,
"doEnqSingleObject: LightDelay mark reached for light "
"delayable message - blocking a bit.\n");
timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */
err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t);
if (err != 0 && err != ETIMEDOUT) {
/* Something is really wrong now. Report to debug log */
DBGOPRINT((obj_t *)pThis,
"potential program bug: pthread_cond_timedwait()"
"/lightdelay returned %d\n",
err);
}
}
}
/* from our regular flow control settings, we are now ready to enqueue the object.
* However, we now need to do a check if the queue permits to add more data. If that
* is not the case, basic flow control enters the field, which means we wait for
* the queue to become ready or drop the new message. -- rgerhards, 2008-03-14
*/
while ((pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) ||
((pThis->qType == QUEUETYPE_DISK || pThis->bIsDA) && pThis->sizeOnDiskMax != 0 &&
pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull);
if (pThis->toEnq == 0 || pThis->bEnqOnly) {
DBGOPRINT((obj_t *)pThis,
"doEnqSingleObject: queue FULL - configured for immediate "
"discarding QueueSize=%d MaxQueueSize=%d sizeOnDisk=%lld "
"sizeOnDiskMax=%lld\n",
pThis->iQueueSize, pThis->iMaxQueueSize, pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax);
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else {
DBGOPRINT((obj_t *)pThis, "doEnqSingleObject: queue FULL - waiting %dms to drain.\n", pThis->toEnq);
if (glbl.GetGlobalInputTermState()) {
DBGOPRINT((obj_t *)pThis,
"doEnqSingleObject: queue FULL, discard due to "
"FORCE_TERM.\n");
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
timeoutComp(&t, pThis->toEnq);
const int r = pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t);
if (dbgTimeoutToStderr && r != 0) {
fprintf(stderr,
"%lld: queue timeout(%dms), error %d%s, "
"lost message %s\n",
(long long)time(NULL), pThis->toEnq, r, (r == ETIMEDOUT) ? "[ETIMEDOUT]" : "", pMsg->pszRawMsg);
}
if (r == ETIMEDOUT) {
DBGOPRINT((obj_t *)pThis, "doEnqSingleObject: cond timeout, dropping message!\n");
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
} else if (r != 0) {
DBGOPRINT((obj_t *)pThis, "doEnqSingleObject: cond error %d, dropping message!\n", r);
STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
msgDestruct(&pMsg);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
dbgoprint((obj_t *)pThis, "doEnqSingleObject: wait solved queue full condition, enqueing\n");
}
}
/* and finally enqueue the message */
CHKiRet(qqueueAdd(pThis, pMsg));
STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize);
/* check if we had a file rollover and need to persist
* the .qi file for robustness reasons.
* Note: the n=2 write is required for closing the old file and
* the n=1 write is required after opening and writing to the new
* file.
*/
if (pThis->tVars.disk.nForcePersist > 0) {
DBGOPRINT((obj_t *)pThis, ".qi file write required for robustness reasons (n=%d)\n",
pThis->tVars.disk.nForcePersist);
pThis->tVars.disk.nForcePersist--;
qqueuePersist(pThis, QUEUE_CHECKPOINT);
}
finalize_it:
RETiRet;
}
/* ------------------------------ multi-enqueue functions ------------------------------ */
/* enqueue multiple user data elements at once. The aim is to provide a faster interface
* for object submission. Uses the multi_submit_t helper object.
* Please note that this function is not cancel-safe and consequently
* sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE
* during its execution. If that is not done, race conditions occur if the
* thread is canceled (most important use case is input module termination).
* rgerhards, 2009-06-16
* Note: there now exists multiple different functions implementing specially
* optimized algorithms for different config cases. -- rgerhards, 2010-06-09
*/
/* now the function for all modes but direct */
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) {
int iCancelStateSave;
int i;
rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pMultiSub != NULL);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(pThis->mut);
for (i = 0; i < pMultiSub->nElem; ++i) {
localRet = doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void *)pMultiSub->ppMsgs[i]);
if (localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) ABORT_FINALIZE(localRet);
}
qqueueChkPersist(pThis, pMultiSub->nElem);
finalize_it:
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
DBGOPRINT((obj_t *)pThis, "MultiEnqObj advised worker start\n");
RETiRet;
}
/* now, the same function, but for direct mode */
static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) {
int i;
wti_t *pWti;
DEFiRet;
pWti = wtiGetDummy();
pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
for (i = 0; i < pMultiSub->nElem; ++i) {
CHKiRet(qAddDirectWithWti(pThis, (void *)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
RETiRet;
}
/* ------------------------------ END multi-enqueue functions ------------------------------ */
/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/
rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, smsg_t *pMsg) {
DEFiRet;
int iCancelStateSave;
ISOBJ_TYPE_assert(pThis, qqueue);
const int isNonDirectQ = pThis->qType != QUEUETYPE_DIRECT;
if (isNonDirectQ) {
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(pThis->mut);
}
CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg));
qqueueChkPersist(pThis, 1);
finalize_it:
if (isNonDirectQ) {
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
DBGOPRINT((obj_t *)pThis, "EnqueueMsg advised worker start\n");
}
RETiRet;
}
/* are any queue params set at all? 1 - yes, 0 - no
* We need to evaluate the param block for this function, which is somewhat
* inefficient. HOWEVER, this is only done during config load, so we really
* don't care... -- rgerhards, 2013-05-10
*/
int queueCnfParamsSet(struct nvlst *lst) {
int r;
struct cnfparamvals *pvals;
pvals = nvlstGetParams(lst, &pblk, NULL);
r = cnfparamvalsIsSet(&pblk, pvals);
cnfparamvalsDestruct(pvals, &pblk);
return r;
}
static rsRetVal initCryprov(qqueue_t *pThis, struct nvlst *lst) {
uchar szDrvrName[1024];
DEFiRet;
if (snprintf((char *)szDrvrName, sizeof(szDrvrName), "lmcry_%s", pThis->cryprovName) == sizeof(szDrvrName)) {
LogError(0, RS_RET_ERR,
"queue: crypto provider "
"name is too long: '%s' - encryption disabled",
pThis->cryprovName);
ABORT_FINALIZE(RS_RET_ERR);
}
pThis->cryprovNameFull = ustrdup(szDrvrName);
pThis->cryprov.ifVersion = cryprovCURR_IF_VERSION;
/* The pDrvrName+2 below is a hack to obtain the object name. It
* safes us to have yet another variable with the name without "lm" in
* front of it. If we change the module load interface, we may re-think
* about this hack, but for the time being it is efficient and clean enough.
*/
if (obj.UseObj(__FILE__, szDrvrName, szDrvrName, (void *)&pThis->cryprov) != RS_RET_OK) {
LogError(0, RS_RET_LOAD_ERROR,
"queue: could not load "
"crypto provider '%s' - encryption disabled",
szDrvrName);
ABORT_FINALIZE(RS_RET_CRYPROV_ERR);
}
if (pThis->cryprov.Construct(&pThis->cryprovData) != RS_RET_OK) {
LogError(0, RS_RET_CRYPROV_ERR,
"queue: error constructing "
"crypto provider %s dataset - encryption disabled",
szDrvrName);
ABORT_FINALIZE(RS_RET_CRYPROV_ERR);
}
CHKiRet(pThis->cryprov.SetCnfParam(pThis->cryprovData, lst, CRYPROV_PARAMTYPE_DISK));
dbgprintf("loaded crypto provider %s, data instance at %p\n", szDrvrName, pThis->cryprovData);
pThis->useCryprov = 1;
finalize_it:
RETiRet;
}
/* check the the queue file name is unique. */
static rsRetVal ATTR_NONNULL() checkUniqueDiskFile(qqueue_t *const pThis) {
DEFiRet;
struct queue_filename *queue_fn_curr = queue_filename_root;
struct queue_filename *newetry = NULL;
const char *const curr_dirname = (pThis->pszSpoolDir == NULL) ? "" : (char *)pThis->pszSpoolDir;
if (pThis->pszFilePrefix == NULL) {
FINALIZE; /* no disk queue! */
}
while (queue_fn_curr != NULL) {
if (!strcmp((const char *)pThis->pszFilePrefix, queue_fn_curr->filename) &&
!strcmp(curr_dirname, queue_fn_curr->dirname)) {
parser_errmsg(
"queue directory '%s' and file name prefix '%s' already used. "
"This is not possible. Please make it unique.",
curr_dirname, pThis->pszFilePrefix);
ABORT_FINALIZE(RS_RET_ERR_QUEUE_FN_DUP);
}
queue_fn_curr = queue_fn_curr->next;
}
/* name ok, so let's add it to the list */
CHKmalloc(newetry = calloc(1, sizeof(struct queue_filename)));
CHKmalloc(newetry->filename = strdup((char *)pThis->pszFilePrefix));
CHKmalloc(newetry->dirname = strdup(curr_dirname));
newetry->next = queue_filename_root;
queue_filename_root = newetry;
finalize_it:
if (iRet != RS_RET_OK) {
if (newetry != NULL) {
free((void *)newetry->filename);
free((void *)newetry);
}
}
RETiRet;
}
void qqueueCorrectParams(qqueue_t *pThis) {
int goodval; /* a "good value" to use for comparisons (different objects) */
if (pThis->iMaxQueueSize < 100 && (pThis->qType == QUEUETYPE_LINKEDLIST || pThis->qType == QUEUETYPE_FIXED_ARRAY)) {
LogMsg(0, RS_RET_OK_WARN, LOG_WARNING,
"Note: queue.size=\"%d\" is very "
"low and can lead to unpredictable results. See also "
"https://www.rsyslog.com/lower-bound-for-queue-sizes/",
pThis->iMaxQueueSize);
}
/* we need to do a quick check if our water marks are set plausible. If not,
* we correct the most important shortcomings.
*/
goodval = (pThis->iMaxQueueSize / 100) * 60;
if (pThis->iHighWtrMrk != -1 && pThis->iHighWtrMrk < goodval) {
LogMsg(0, RS_RET_CONF_PARSE_WARNING, LOG_WARNING,
"queue \"%s\": high water mark "
"is set quite low at %d. You should only set it below "
"60%% (%d) if you have a good reason for this.",
obj.GetName((obj_t *)pThis), pThis->iHighWtrMrk, goodval);
}
if (pThis->iNumWorkerThreads > 1) {
goodval = (pThis->iMaxQueueSize / 100) * 10;
if (pThis->iMinMsgsPerWrkr != -1 && pThis->iMinMsgsPerWrkr < goodval) {
LogMsg(0, RS_RET_CONF_PARSE_WARNING, LOG_WARNING,
"queue \"%s\": "
"queue.workerThreadMinimumMessage "
"is set quite low at %d. You should only set it below "
"10%% (%d) if you have a good reason for this.",
obj.GetName((obj_t *)pThis), pThis->iMinMsgsPerWrkr, goodval);
}
}
if (pThis->iDiscardMrk > pThis->iMaxQueueSize) {
LogError(0, RS_RET_PARAM_ERROR,
"error: queue \"%s\": "
"queue.discardMark %d is set larger than queue.size",
obj.GetName((obj_t *)pThis), pThis->iDiscardMrk);
}
goodval = (pThis->iMaxQueueSize / 100) * 80;
if (pThis->iDiscardMrk != -1 && pThis->iDiscardMrk < goodval) {
LogMsg(0, RS_RET_CONF_PARSE_WARNING, LOG_WARNING,
"queue \"%s\": queue.discardMark "
"is set quite low at %d. You should only set it below "
"80%% (%d) if you have a good reason for this.",
obj.GetName((obj_t *)pThis), pThis->iDiscardMrk, goodval);
}
if (pThis->pszFilePrefix != NULL) { /* This means we have a potential DA queue */
if (pThis->iFullDlyMrk != -1 && pThis->iFullDlyMrk < pThis->iHighWtrMrk) {
LogMsg(0, RS_RET_CONF_WRN_FULLDLY_BELOW_HIGHWTR, LOG_WARNING,
"queue \"%s\": queue.fullDelayMark "
"is set below high water mark. This will result in DA mode "
" NOT being activated for full delayable messages: In many "
"cases this is a configuration error, please check if this "
"is really what you want",
obj.GetName((obj_t *)pThis));
}
}
/* now come parameter corrections and defaults */
if (pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) {
pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90;
if (pThis->iHighWtrMrk == 0) { /* guard against very low max queue sizes! */
pThis->iHighWtrMrk = pThis->iMaxQueueSize;
}
}
if (pThis->iLowWtrMrk < 2 || pThis->iLowWtrMrk > pThis->iMaxQueueSize || pThis->iLowWtrMrk > pThis->iHighWtrMrk) {
pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70;
if (pThis->iLowWtrMrk == 0) {
pThis->iLowWtrMrk = 1;
}
}
if ((pThis->iMinMsgsPerWrkr < 1 || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize)) {
pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads;
}
if (pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) {
pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97;
if (pThis->iFullDlyMrk == 0) {
pThis->iFullDlyMrk = (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1;
}
}
if (pThis->iLightDlyMrk == 0) {
pThis->iLightDlyMrk = pThis->iMaxQueueSize;
}
if (pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) {
pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70;
if (pThis->iLightDlyMrk == 0) {
pThis->iLightDlyMrk = (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1;
}
}
if (pThis->iDiscardMrk < 1 || pThis->iDiscardMrk > pThis->iMaxQueueSize) {
pThis->iDiscardMrk = (pThis->iMaxQueueSize / 100) * 98;
if (pThis->iDiscardMrk == 0) {
/* for very small queues, we disable this by default */
pThis->iDiscardMrk = pThis->iMaxQueueSize;
}
}
if (pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) {
pThis->iDeqBatchSize = pThis->iMaxQueueSize;
}
}
/* apply all params from param block to queue. Must be called before
* finalizing. This supports the v6 config system. Defaults were already
* set during queue creation. The pvals object is destructed by this
* function.
*/
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst) {
int i;
struct cnfparamvals *pvals;
int n_params_set = 0;
DEFiRet;
pvals = nvlstGetParams(lst, &pblk, NULL);
if (pvals == NULL) {
parser_errmsg("error processing queue config parameters");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
if (Debug) {
dbgprintf("queue param blk:\n");
cnfparamsPrint(&pblk, pvals);
}
for (i = 0; i < pblk.nParams; ++i) {
if (!pvals[i].bUsed) continue;
n_params_set++;
if (!strcmp(pblk.descr[i].name, "queue.filename")) {
pThis->pszFilePrefix = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
} else if (!strcmp(pblk.descr[i].name, "queue.cry.provider")) {
pThis->cryprovName = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if (!strcmp(pblk.descr[i].name, "queue.spooldirectory")) {
free(pThis->pszSpoolDir);
pThis->pszSpoolDir = (uchar *)es_str2cstr(pvals[i].val.d.estr, NULL);
pThis->lenSpoolDir = es_strlen(pvals[i].val.d.estr);
if (pThis->pszSpoolDir[pThis->lenSpoolDir - 1] == '/') {
pThis->pszSpoolDir[pThis->lenSpoolDir - 1] = '\0';
--pThis->lenSpoolDir;
parser_errmsg(
"queue.spooldirectory must not end with '/', "
"corrected to '%s'",
pThis->pszSpoolDir);
}
} else if (!strcmp(pblk.descr[i].name, "queue.size")) {
if (pvals[i].val.d.n > 0x7fffffff) {
parser_warnmsg(
"queue.size higher than maximum (2147483647) - "
"corrected to maximum");
pvals[i].val.d.n = 0x7fffffff;
} else if (pvals[i].val.d.n > OVERSIZE_QUEUE_WATERMARK) {
parser_warnmsg(
"queue.size=%d is very large - is this "
"really intended? More info at "
"https://www.rsyslog.com/avoid-overly-large-in-memory-queues/",
(int)pvals[i].val.d.n);
}
pThis->iMaxQueueSize = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {
pThis->iDeqBatchSize = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.mindequeuebatchsize")) {
pThis->iMinDeqBatchSize = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.mindequeuebatchsize.timeout")) {
pThis->toMinDeqBatchSize = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.maxdiskspace")) {
pThis->sizeOnDiskMax = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.highwatermark")) {
pThis->iHighWtrMrk = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.lowwatermark")) {
pThis->iLowWtrMrk = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.fulldelaymark")) {
pThis->iFullDlyMrk = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.lightdelaymark")) {
pThis->iLightDlyMrk = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.discardmark")) {
pThis->iDiscardMrk = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.discardseverity")) {
pThis->iDiscardSeverity = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.checkpointinterval")) {
pThis->iPersistUpdCnt = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.syncqueuefiles")) {
pThis->bSyncQueueFiles = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.type")) {
pThis->qType = (queueType_t)pvals[i].val.d.n;
if (pThis->qType == QUEUETYPE_DIRECT) {
/* if we have a direct queue, we mimic this param was not set.
* Our prime intent is to make sure we detect when "real" params
* are set on a direct queue, and the type setting is obviously
* not relevant here.
*/
n_params_set--;
}
} else if (!strcmp(pblk.descr[i].name, "queue.workerthreads")) {
pThis->iNumWorkerThreads = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.timeoutshutdown")) {
pThis->toQShutdown = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.timeoutactioncompletion")) {
pThis->toActShutdown = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.timeoutenqueue")) {
pThis->toEnq = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.timeoutworkerthreadshutdown")) {
pThis->toWrkShutdown = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.workerthreadminimummessages")) {
pThis->iMinMsgsPerWrkr = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.maxfilesize")) {
pThis->iMaxFileSize = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.saveonshutdown")) {
pThis->bSaveOnShutdown = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.dequeueslowdown")) {
pThis->iDeqSlowdown = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.dequeuetimebegin")) {
pThis->iDeqtWinFromHr = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.dequeuetimeend")) {
pThis->iDeqtWinToHr = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.samplinginterval")) {
pThis->iSmpInterval = pvals[i].val.d.n;
} else if (!strcmp(pblk.descr[i].name, "queue.takeflowctlfrommsg")) {
pThis->takeFlowCtlFromMsg = pvals[i].val.d.n;
} else {
DBGPRINTF(
"queue: program error, non-handled "
"param '%s'\n",
pblk.descr[i].name);
}
}
checkUniqueDiskFile(pThis);
if (pThis->qType == QUEUETYPE_DIRECT) {
if (n_params_set > 0) {
LogMsg(0, RS_RET_OK, LOG_WARNING,
"warning on queue '%s': "
"queue is in direct mode, but parameters have been set. "
"These PARAMETERS cannot be applied and WILL BE IGNORED.",
obj.GetName((obj_t *)pThis));
}
} else if (pThis->qType == QUEUETYPE_DISK) {
if (pThis->pszFilePrefix == NULL) {
LogError(0, RS_RET_QUEUE_DISK_NO_FN,
"error on queue '%s', disk mode selected, but "
"no queue file name given; queue type changed to 'linkedList'",
obj.GetName((obj_t *)pThis));
pThis->qType = QUEUETYPE_LINKEDLIST;
}
}
if (pThis->pszFilePrefix == NULL && pThis->cryprovName != NULL) {
LogError(0, RS_RET_QUEUE_CRY_DISK_ONLY,
"error on queue '%s', crypto provider can "
"only be set for disk or disk assisted queue - ignored",
obj.GetName((obj_t *)pThis));
free(pThis->cryprovName);
pThis->cryprovName = NULL;
}
if (pThis->cryprovName != NULL) {
initCryprov(pThis, lst);
}
cnfparamvalsDestruct(pvals, &pblk);
finalize_it:
RETiRet;
}
/* return 1 if the content of two qqueue_t structs equal */
int queuesEqual(qqueue_t *pOld, qqueue_t *pNew) {
return (NUM_EQUALS(qType) && NUM_EQUALS(iMaxQueueSize) && NUM_EQUALS(iDeqBatchSize) &&
NUM_EQUALS(iMinDeqBatchSize) && NUM_EQUALS(toMinDeqBatchSize) && NUM_EQUALS(sizeOnDiskMax) &&
NUM_EQUALS(iHighWtrMrk) && NUM_EQUALS(iLowWtrMrk) && NUM_EQUALS(iFullDlyMrk) && NUM_EQUALS(iLightDlyMrk) &&
NUM_EQUALS(iDiscardMrk) && NUM_EQUALS(iDiscardSeverity) && NUM_EQUALS(iPersistUpdCnt) &&
NUM_EQUALS(bSyncQueueFiles) && NUM_EQUALS(iNumWorkerThreads) && NUM_EQUALS(toQShutdown) &&
NUM_EQUALS(toActShutdown) && NUM_EQUALS(toEnq) && NUM_EQUALS(toWrkShutdown) &&
NUM_EQUALS(iMinMsgsPerWrkr) && NUM_EQUALS(iMaxFileSize) && NUM_EQUALS(bSaveOnShutdown) &&
NUM_EQUALS(iDeqSlowdown) && NUM_EQUALS(iDeqtWinFromHr) && NUM_EQUALS(iDeqtWinToHr) &&
NUM_EQUALS(iSmpInterval) && NUM_EQUALS(takeFlowCtlFromMsg) && USTR_EQUALS(pszFilePrefix) &&
USTR_EQUALS(cryprovName));
}
/* some simple object access methods
* Note: the semicolons behind the macros are actually empty declarations. This is
* a work-around for clang-format's missing understanding of generative macros.
* Some compilers may flag this empty declarations by a warning. If so, we need
* to disable this warning. Alternatively, we could exclude this code from being
* reformatted by clang-format;
*/
DEFpropSetMeth(qqueue, bSyncQueueFiles, int);
DEFpropSetMeth(qqueue, iPersistUpdCnt, int);
DEFpropSetMeth(qqueue, iDeqtWinFromHr, int);
DEFpropSetMeth(qqueue, iDeqtWinToHr, int);
DEFpropSetMeth(qqueue, toQShutdown, long);
DEFpropSetMeth(qqueue, toActShutdown, long);
DEFpropSetMeth(qqueue, toWrkShutdown, long);
DEFpropSetMeth(qqueue, toEnq, long);
DEFpropSetMeth(qqueue, iHighWtrMrk, int);
DEFpropSetMeth(qqueue, iLowWtrMrk, int);
DEFpropSetMeth(qqueue, iDiscardMrk, int);
DEFpropSetMeth(qqueue, iDiscardSeverity, int);
DEFpropSetMeth(qqueue, iLightDlyMrk, int);
DEFpropSetMeth(qqueue, iNumWorkerThreads, int);
DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int);
DEFpropSetMeth(qqueue, bSaveOnShutdown, int);
DEFpropSetMeth(qqueue, pAction, action_t *);
DEFpropSetMeth(qqueue, iDeqSlowdown, int);
DEFpropSetMeth(qqueue, iDeqBatchSize, int);
DEFpropSetMeth(qqueue, iMinDeqBatchSize, int);
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64);
DEFpropSetMeth(qqueue, iSmpInterval, int);
/* This function can be used as a generic way to set properties. Only the subset
* of properties required to read persisted property bags is supported. This
* functions shall only be called by the property bag reader, thus it is static.
* rgerhards, 2008-01-11
*/
#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar *)name, sizeof(name) - 1)
static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
assert(pProp != NULL);
if (isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.num;
#ifdef ENABLE_IMDIAG
iOverallQueueSize += pThis->iQueueSize;
#endif
} else if (isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
} else if (isProp("qType")) {
if (pThis->qType != pProp->val.num) ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);
}
finalize_it:
RETiRet;
}
#undef isProp
/* dummy */
static rsRetVal qqueueQueryInterface(interface_t __attribute__((unused)) * i) {
return RS_RET_NOT_IMPLEMENTED;
}
/* Initialize the stream class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(strm, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
/* now set our own handlers */
OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);
ENDObjClassInit(qqueue)
|