1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651
|
// Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
// SPDX-FileCopyrightText: 2024 Redict Contributors
// SPDX-FileCopyrightText: 2024 Salvatore Sanfilippo <antirez at gmail dot com>
//
// SPDX-License-Identifier: BSD-3-Clause
// SPDX-License-Identifier: LGPL-3.0-only
#include "server.h"
#include "cluster.h"
#include "atomicvar.h"
#include "latency.h"
#include "script.h"
#include "functions.h"
#include <signal.h>
#include <ctype.h>
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
/* Flags for expireIfNeeded */
#define EXPIRE_FORCE_DELETE_EXPIRED 1
#define EXPIRE_AVOID_DELETE_EXPIRED 2
/* Return values for expireIfNeeded */
typedef enum {
KEY_VALID = 0, /* Could be volatile and not yet expired, non-volatile, or even non-existing key. */
KEY_EXPIRED, /* Logically expired but not yet deleted. */
KEY_DELETED /* The key was deleted now. */
} keyStatus;
keyStatus expireIfNeeded(redictDb *db, robj *key, int flags);
int keyIsExpired(redictDb *db, robj *key);
static void dbSetValue(redictDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
void updateLFU(robj *val) {
unsigned long counter = LFUDecrAndReturn(val);
counter = LFULogIncr(counter);
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
/* Lookup a key for read or write operations, or return NULL if the key is not
* found in the specified DB. This function implements the functionality of
* lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants.
*
* Side-effects of calling this function:
*
* 1. A key gets expired if it reached it's TTL.
* 2. The key's last access time is updated.
* 3. The global keys hits/misses stats are updated (reported in INFO).
* 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
*
* Flags change the behavior of this command:
*
* LOOKUP_NONE (or zero): No special flags are passed.
* LOOKUP_NOTOUCH: Don't alter the last access time of the key.
* LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss.
* LOOKUP_NOSTATS: Don't increment key hits/misses counters.
* LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on
* replicas, use separate keyspace stats and events (TODO)).
* LOOKUP_NOEXPIRE: Perform expiration check, but avoid deleting the key,
* so that we don't have to propagate the deletion.
*
* Note: this function also returns NULL if the key is logically expired but
* still existing, in case this is a replica and the LOOKUP_WRITE is not set.
* Even if the key expiry is master-driven, we can correctly report a key is
* expired on replicas even if the master is lagging expiring our key via DELs
* in the replication link. */
robj *lookupKey(redictDb *db, robj *key, int flags) {
dictEntry *de = dbFind(db, key->ptr);
robj *val = NULL;
if (de) {
val = dictGetVal(de);
/* Forcing deletion of expired keys on a replica makes the replica
* inconsistent with the master. We forbid it on readonly replicas, but
* we have to allow it on writable replicas to make write commands
* behave consistently.
*
* It's possible that the WRITE flag is set even during a readonly
* command, since the command may trigger events that cause modules to
* perform additional writes. */
int is_ro_replica = server.masterhost && server.repl_slave_ro;
int expire_flags = 0;
if (flags & LOOKUP_WRITE && !is_ro_replica)
expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED;
if (flags & LOOKUP_NOEXPIRE)
expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED;
if (expireIfNeeded(db, key, expire_flags) != KEY_VALID) {
/* The key is no longer valid. */
val = NULL;
}
}
if (val) {
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH &&
server.current_client->cmd->proc != touchCommand)
flags |= LOOKUP_NOTOUCH;
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
server.stat_keyspace_hits++;
/* TODO: Use separate hits stats for WRITE */
} else {
if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
server.stat_keyspace_misses++;
/* TODO: Use separate misses stats and notify event for WRITE */
}
return val;
}
/* Lookup a key for read operations, or return NULL if the key is not found
* in the specified DB.
*
* This API should not be used when we write to the key after obtaining
* the object linked to the key, but only for read only operations.
*
* This function is equivalent to lookupKey(). The point of using this function
* rather than lookupKey() directly is to indicate that the purpose is to read
* the key. */
robj *lookupKeyReadWithFlags(redictDb *db, robj *key, int flags) {
serverAssert(!(flags & LOOKUP_WRITE));
return lookupKey(db, key, flags);
}
/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
* common case. */
robj *lookupKeyRead(redictDb *db, robj *key) {
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}
/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached. It's equivalent to lookupKey() with the
* LOOKUP_WRITE flag added.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWriteWithFlags(redictDb *db, robj *key, int flags) {
return lookupKey(db, key, flags | LOOKUP_WRITE);
}
robj *lookupKeyWrite(redictDb *db, robj *key) {
return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
}
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyRead(c->db, key);
if (!o) addReplyOrErrorObject(c, reply);
return o;
}
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyWrite(c->db, key);
if (!o) addReplyOrErrorObject(c, reply);
return o;
}
/* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed.
*
* If the update_if_existing argument is false, the program is aborted
* if the key already exists, otherwise, it can fall back to dbOverwrite. */
static void dbAddInternal(redictDb *db, robj *key, robj *val, int update_if_existing) {
dictEntry *existing;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing);
if (update_if_existing && existing) {
dbSetValue(db, key, val, 1, existing);
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
kvstoreDictSetKey(db->keys, slot, de, sdsdup(key->ptr));
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
signalKeyAsReady(db, key, val->type);
notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
}
void dbAdd(redictDb *db, robj *key, robj *val) {
dbAddInternal(db, key, val, 0);
}
/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled.
* The only difference between this function and getKeySlot, is that it's not using cached key slot from the current_client
* and always calculates CRC hash.
* This is useful when slot needs to be calculated for a key that user didn't request for, such as in case of eviction. */
int calculateKeySlot(sds key) {
return server.cluster_enabled ? keyHashSlot(key, (int) sdslen(key)) : 0;
}
/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/
int getKeySlot(sds key) {
/* This is performance optimization that uses pre-set slot id from the current command,
* in order to avoid calculation of the key hash.
* This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND) {
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key)==server.current_client->slot);
return server.current_client->slot;
}
return calculateKeySlot(key);
}
/* This is a special version of dbAdd() that is used only when loading
* keys from the RDB file: the key is passed as an SDS string that is
* retained by the function (and not freed by the caller).
*
* Moreover this function will not abort if the key is already busy, to
* give more control to the caller, nor will signal the key as ready
* since it is not useful in this context.
*
* The function returns 1 if the key was added to the database, taking
* ownership of the SDS string, otherwise 0 is returned, and is up to the
* caller to free the SDS string. */
int dbAddRDBLoad(redictDb *db, sds key, robj *val) {
int slot = getKeySlot(key);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL);
if (de == NULL) return 0;
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
return 1;
}
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
* This function does not modify the expire time of the existing key.
*
* The 'overwrite' flag is an indication whether this is done as part of a
* complete replacement of their key, which can be thought as a deletion and
* replacement (in which case we need to emit deletion signals), or just an
* update of a value of an existing key (when false).
*
* The dictEntry input is optional, can be used if we already have one.
*
* The program is aborted if the key was not already present. */
static void dbSetValue(redictDb *db, robj *key, robj *val, int overwrite, dictEntry *de) {
int slot = getKeySlot(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr);
serverAssertWithInfo(NULL,key,de != NULL);
robj *old = dictGetVal(de);
val->lru = old->lru;
if (overwrite) {
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
* need to incr to retain old */
incrRefCount(old);
/* Although the key is not really deleted from the database, we regard
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module. */
moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,old->type);
decrRefCount(old);
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(key,old,db->id);
} else {
decrRefCount(old);
}
}
/* Replace an existing key with a new value, we just replace value and don't
* emit any events */
void dbReplaceValue(redictDb *db, robj *key, robj *val) {
dbSetValue(db, key, val, 0, NULL);
}
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
* 1) The ref count of the value object is incremented.
* 2) clients WATCHing for the destination key notified.
* 3) The expire time of the key is reset (the key is made persistent),
* unless 'SETKEY_KEEPTTL' is enabled in flags.
* 4) The key lookup can take place outside this interface outcome will be
* delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST'
*
* All the new keys in the database should be created via this interface.
* The client 'c' argument may be set to NULL if the operation is performed
* in a context where there is no clear client performing the operation. */
void setKey(client *c, redictDb *db, robj *key, robj *val, int flags) {
int keyfound = 0;
if (flags & SETKEY_ALREADY_EXIST)
keyfound = 1;
else if (flags & SETKEY_ADD_OR_UPDATE)
keyfound = -1;
else if (!(flags & SETKEY_DOESNT_EXIST))
keyfound = (lookupKeyWrite(db,key) != NULL);
if (!keyfound) {
dbAdd(db,key,val);
} else if (keyfound<0) {
dbAddInternal(db,key,val,1);
} else {
dbSetValue(db,key,val,1,NULL);
}
incrRefCount(val);
if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key);
if (!(flags & SETKEY_NO_SIGNAL)) signalModifiedKey(c,db,key);
}
/* Return a random key, in form of a Redict object.
* If there are no keys, NULL is returned.
*
* The function makes sure to return keys not already expired. */
robj *dbRandomKey(redictDb *db) {
dictEntry *de;
int maxtries = 100;
int allvolatile = kvstoreSize(db->keys) == kvstoreSize(db->expires);
while(1) {
sds key;
robj *keyobj;
int randomSlot = kvstoreGetFairRandomDictIndex(db->keys);
de = kvstoreDictGetFairRandomKey(db->keys, randomSlot);
if (de == NULL) return NULL;
key = dictGetKey(de);
keyobj = createStringObject(key,sdslen(key));
if (dbFindExpires(db, key)) {
if (allvolatile && server.masterhost && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the slave, so the function cannot stop because
* expireIfNeeded() is false, nor it can stop because
* dictGetFairRandomKey() returns NULL (there are keys to return).
* To prevent the infinite loop we do some tries, but if there
* are the conditions for an infinite loop, eventually we
* return a key name that may be already expired. */
return keyobj;
}
if (expireIfNeeded(db,keyobj,0) != KEY_VALID) {
decrRefCount(keyobj);
continue; /* search for another key. This expired. */
}
}
return keyobj;
}
}
/* Helper for sync and async delete. */
int dbGenericDelete(redictDb *db, robj *key, int async, int flags) {
dictEntry **plink;
int table;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
* need to incr to retain val */
incrRefCount(val);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val,db->id,flags);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,val->type);
/* We should call decr before freeObjAsync. If not, the refcount may be
* greater than 1, so freeObjAsync doesn't work */
decrRefCount(val);
if (async) {
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
kvstoreDictSetVal(db->keys, slot, de, NULL);
}
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
kvstoreDictDelete(db->expires, slot, key->ptr);
kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table);
return 1;
} else {
return 0;
}
}
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redictDb *db, robj *key) {
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
}
/* Delete a key, value, and associated expiration entry if any, from the DB. If
* the value consists of many allocations, it may be freed asynchronously. */
int dbAsyncDelete(redictDb *db, robj *key) {
return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED);
}
/* This is a wrapper whose behavior depends on the Redict lazy free
* configuration. Deletes the key synchronously or asynchronously. */
int dbDelete(redictDb *db, robj *key) {
return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED);
}
/* Prepare the string object stored at 'key' to be modified destructively
* to implement commands like SETBIT or APPEND.
*
* An object is usually ready to be modified unless one of the two conditions
* are true:
*
* 1) The object 'o' is shared (refcount > 1), we don't want to affect
* other users.
* 2) The object encoding is not "RAW".
*
* If the object is found in one of the above conditions (or both) by the
* function, an unshared / not-encoded copy of the string object is stored
* at 'key' in the specified 'db'. Otherwise the object 'o' itself is
* returned.
*
* USAGE:
*
* The object 'o' is what the caller already obtained by looking up 'key'
* in 'db', the usage pattern looks like this:
*
* o = lookupKeyWrite(db,key);
* if (checkType(c,o,OBJ_STRING)) return;
* o = dbUnshareStringValue(db,key,o);
*
* At this point the caller is ready to modify the object, for example
* using an sdscat() call to append some data, or anything else.
*/
robj *dbUnshareStringValue(redictDb *db, robj *key, robj *o) {
serverAssert(o->type == OBJ_STRING);
if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbReplaceValue(db,key,o);
}
return o;
}
/* Remove all keys from the database(s) structure. The dbarray argument
* may not be the server main DBs (could be a temporary DB).
*
* The dbnum can be -1 if all the DBs should be emptied, or the specified
* DB index if we want to empty only a single database.
* The function returns the number of keys removed from the database(s). */
long long emptyDbStructure(redictDb *dbarray, int dbnum, int async,
void(callback)(dict*))
{
long long removed = 0;
int startdb, enddb;
if (dbnum == -1) {
startdb = 0;
enddb = server.dbnum-1;
} else {
startdb = enddb = dbnum;
}
for (int j = startdb; j <= enddb; j++) {
removed += kvstoreSize(dbarray[j].keys);
if (async) {
emptyDbAsync(&dbarray[j]);
} else {
kvstoreEmpty(dbarray[j].keys, callback);
kvstoreEmpty(dbarray[j].expires, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j].avg_ttl = 0;
dbarray[j].expires_cursor = 0;
}
return removed;
}
/* Remove all data (keys and functions) from all the databases in a
* Redict server. If callback is given the function is called from
* time to time to signal that work is in progress.
*
* The dbnum can be -1 if all the DBs should be flushed, or the specified
* DB number if we want to flush only a single Redict database number.
*
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
* EMPTYDB_ASYNC if we want the memory to be freed in a different thread
* and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set
* to specify that we do not want to delete the functions.
*
* On success the function returns the number of keys removed from the
* database(s). Otherwise -1 is returned in the specific case the
* DB number is out of range, and errno is set to EINVAL. */
long long emptyData(int dbnum, int flags, void(callback)(dict*)) {
int async = (flags & EMPTYDB_ASYNC);
int with_functions = !(flags & EMPTYDB_NOFUNCTIONS);
RedictModuleFlushInfoV1 fi = {REDICTMODULE_FLUSHINFO_VERSION,!async,dbnum};
long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) {
errno = EINVAL;
return -1;
}
/* Fire the flushdb modules event. */
moduleFireServerEvent(REDICTMODULE_EVENT_FLUSHDB,
REDICTMODULE_SUBEVENT_FLUSHDB_START,
&fi);
/* Make sure the WATCHed keys are affected by the FLUSH* commands.
* Note that we need to call the function while the keys are still
* there. */
signalFlushedDb(dbnum, async);
/* Empty redict database structure. */
removed = emptyDbStructure(server.db, dbnum, async, callback);
if (dbnum == -1) flushSlaveKeysWithExpireList();
if (with_functions) {
serverAssert(dbnum == -1);
functionsLibCtxClearCurrent(async);
}
/* Also fire the end event. Note that this event will fire almost
* immediately after the start event if the flush is asynchronous. */
moduleFireServerEvent(REDICTMODULE_EVENT_FLUSHDB,
REDICTMODULE_SUBEVENT_FLUSHDB_END,
&fi);
return removed;
}
/* Initialize temporary db on replica for use during diskless replication. */
redictDb *initTempDb(void) {
int slot_count_bits = 0;
int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND;
if (server.cluster_enabled) {
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
flags |= KVSTORE_FREE_EMPTY_DICTS;
}
redictDb *tempDb = zcalloc(sizeof(redictDb)*server.dbnum);
for (int i=0; i<server.dbnum; i++) {
tempDb[i].id = i;
tempDb[i].keys = kvstoreCreate(&dbDictType, slot_count_bits, flags);
tempDb[i].expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags);
}
return tempDb;
}
/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
void discardTempDb(redictDb *tempDb, void(callback)(dict*)) {
int async = 1;
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, async, callback);
for (int i=0; i<server.dbnum; i++) {
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);
}
zfree(tempDb);
}
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum)
return C_ERR;
c->db = &server.db[id];
return C_OK;
}
long long dbTotalServerKeyCount(void) {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += kvstoreSize(server.db[j].keys);
}
return total;
}
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
* Every time a key in the database is modified the function
* signalModifiedKey() is called.
*
* Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/
/* Note that the 'c' argument may be NULL if the key was modified out of
* a context of a client. */
void signalModifiedKey(client *c, redictDb *db, robj *key) {
touchWatchedKey(db,key);
trackingInvalidateKey(c,key,1);
}
void signalFlushedDb(int dbid, int async) {
int startdb, enddb;
if (dbid == -1) {
startdb = 0;
enddb = server.dbnum-1;
} else {
startdb = enddb = dbid;
}
for (int j = startdb; j <= enddb; j++) {
scanDatabaseForDeletedKeys(&server.db[j], NULL);
touchAllWatchedKeysInDb(&server.db[j], NULL);
}
trackingInvalidateKeysOnFlush(async);
/* Changes in this method may take place in swapMainDbWithTempDb as well,
* where we execute similar calls, but with subtle differences as it's
* not simply flushing db. */
}
/*-----------------------------------------------------------------------------
* Type agnostic commands operating on the key space
*----------------------------------------------------------------------------*/
/* Return the set of flags to use for the emptyData() call for FLUSHALL
* and FLUSHDB commands.
*
* sync: flushes the database in an sync manner.
* async: flushes the database in an async manner.
* no option: determine sync or async according to the value of lazyfree-lazy-user-flush.
*
* On success C_OK is returned and the flags are stored in *flags, otherwise
* C_ERR is returned and the function sends an error to the client. */
int getFlushCommandFlags(client *c, int *flags) {
/* Parse the optional ASYNC option. */
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync")) {
*flags = EMPTYDB_NO_FLAGS;
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async")) {
*flags = EMPTYDB_ASYNC;
} else if (c->argc == 1) {
*flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return C_ERR;
}
return C_OK;
}
/* Flushes the whole server data set. */
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyData(-1,flags,NULL);
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
if (server.saveparamslen > 0) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE);
}
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchronous. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}
/* FLUSHDB [ASYNC]
*
* Flushes the currently SELECTed Redict DB. */
void flushdbCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
/* flushdb should not flush the functions */
server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
/* Without the forceCommandPropagation, when DB was already empty,
* FLUSHDB will not be replicated nor put into the AOF. */
forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
addReply(c,shared.ok);
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchronous. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
}
/* FLUSHALL [ASYNC]
*
* Flushes the whole server data set. */
void flushallCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
/* flushall should not flush the functions */
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
/* Without the forceCommandPropagation, when DBs were already empty,
* FLUSHALL will not be replicated nor put into the AOF. */
forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
addReply(c,shared.ok);
}
/* This command implements DEL and UNLINK. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;
for (j = 1; j < c->argc; j++) {
if (expireIfNeeded(c->db,c->argv[j],0) == KEY_DELETED)
continue;
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
signalModifiedKey(c,c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
numdel++;
}
}
addReplyLongLong(c,numdel);
}
void delCommand(client *c) {
delGenericCommand(c,server.lazyfree_lazy_user_del);
}
void unlinkCommand(client *c) {
delGenericCommand(c,1);
}
/* EXISTS key1 key2 ... key_N.
* Return value is the number of keys existing. */
void existsCommand(client *c) {
long long count = 0;
int j;
for (j = 1; j < c->argc; j++) {
if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++;
}
addReplyLongLong(c,count);
}
void selectCommand(client *c) {
int id;
if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK)
return;
if (server.cluster_enabled && id != 0) {
addReplyError(c,"SELECT is not allowed in cluster mode");
return;
}
if (selectDb(c,id) == C_ERR) {
addReplyError(c,"DB index is out of range");
} else {
addReply(c,shared.ok);
}
}
void randomkeyCommand(client *c) {
robj *key;
if ((key = dbRandomKey(c->db)) == NULL) {
addReplyNull(c);
return;
}
addReplyBulk(c,key);
decrRefCount(key);
}
void keysCommand(client *c) {
dictEntry *de;
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys, pslot = -1;
unsigned long numkeys = 0;
void *replylen = addReplyDeferredLen(c);
allkeys = (pattern[0] == '*' && plen == 1);
if (server.cluster_enabled && !allkeys) {
pslot = patternHashSlot(pattern, plen);
}
kvstoreDictIterator *kvs_di = NULL;
kvstoreIterator *kvs_it = NULL;
if (pslot != -1) {
if (!kvstoreDictSize(c->db->keys, pslot)) {
/* Requested slot is empty */
setDeferredArrayLen(c,replylen,0);
return;
}
kvs_di = kvstoreGetDictSafeIterator(c->db->keys, pslot);
} else {
kvs_it = kvstoreIteratorInit(c->db->keys);
}
robj keyobj;
while ((de = kvs_di ? kvstoreDictIteratorNext(kvs_di) : kvstoreIteratorNext(kvs_it)) != NULL) {
sds key = dictGetKey(de);
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
initStaticStringObject(keyobj, key);
if (!keyIsExpired(c->db, &keyobj)) {
addReplyBulkCBuffer(c, key, sdslen(key));
numkeys++;
}
}
if (c->flags & CLIENT_CLOSE_ASAP)
break;
}
if (kvs_di)
kvstoreReleaseDictIterator(kvs_di);
if (kvs_it)
kvstoreIteratorRelease(kvs_it);
setDeferredArrayLen(c,replylen,numkeys);
}
/* Data used by the dict scan callback. */
typedef struct {
list *keys; /* elements that collect from dict */
robj *o; /* o must be a hash/set/zset object, NULL means current db */
long long type; /* the particular type when scan the db */
sds pattern; /* pattern string, NULL means no pattern */
long sampled; /* cumulative number of keys sampled */
int no_values; /* set to 1 means to return keys only */
} scanData;
/* Helper function to compare key type in scan commands */
int objectTypeCompare(robj *o, long long target) {
if (o->type != OBJ_MODULE) {
if (o->type != target)
return 0;
else
return 1;
}
/* module type compare */
long long mt = (long long)REDICTMODULE_TYPE_SIGN(((moduleValue *)o->ptr)->type->id);
if (target != -mt)
return 0;
else
return 1;
}
/* This callback is used by scanGenericCommand in order to collect elements
* returned by the dictionary iterator into a list. */
void scanCallback(void *privdata, const dictEntry *de) {
scanData *data = (scanData *)privdata;
list *keys = data->keys;
robj *o = data->o;
sds val = NULL;
sds key = NULL;
data->sampled++;
/* o and typename can not have values at the same time. */
serverAssert(!((data->type != LLONG_MAX) && o));
/* Filter an element if it isn't the type we want. */
/* TODO: uncomment in redis 8.0
if (!o && data->type != LLONG_MAX) {
robj *rval = dictGetVal(de);
if (!objectTypeCompare(rval, data->type)) return;
}*/
/* Filter element if it does not match the pattern. */
sds keysds = dictGetKey(de);
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), keysds, sdslen(keysds), 0)) {
return;
}
}
if (o == NULL) {
key = keysds;
} else if (o->type == OBJ_SET) {
key = keysds;
} else if (o->type == OBJ_HASH) {
key = keysds;
val = dictGetVal(de);
} else if (o->type == OBJ_ZSET) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), *(double *)dictGetVal(de), LD_STR_AUTO);
key = sdsdup(keysds);
val = sdsnewlen(buf, len);
} else {
serverPanic("Type not handled in SCAN callback.");
}
listAddNodeTail(keys, key);
if (val && !data->no_values) listAddNodeTail(keys, val);
}
/* Try to parse a SCAN cursor stored at object 'o':
* if the cursor is valid, store it as unsigned integer into *cursor and
* returns C_OK. Otherwise return C_ERR and send an error to the
* client. */
int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor) {
if (!string2ull(o->ptr, cursor)) {
addReplyError(c, "invalid cursor");
return C_ERR;
}
return C_OK;
}
char *obj_type_name[OBJ_TYPE_MAX] = {
"string",
"list",
"set",
"zset",
"hash",
NULL, /* module type is special */
"stream"
};
/* Helper function to get type from a string in scan commands */
long long getObjectTypeByName(char *name) {
for (long long i = 0; i < OBJ_TYPE_MAX; i++) {
if (obj_type_name[i] && !strcasecmp(name, obj_type_name[i])) {
return i;
}
}
moduleType *mt = moduleTypeLookupModuleByNameIgnoreCase(name);
if (mt != NULL) return -(REDICTMODULE_TYPE_SIGN(mt->id));
return LLONG_MAX;
}
char *getObjectTypeName(robj *o) {
if (o == NULL) {
return "none";
}
serverAssert(o->type >= 0 && o->type < OBJ_TYPE_MAX);
if (o->type == OBJ_MODULE) {
moduleValue *mv = o->ptr;
return mv->type->name;
} else {
return obj_type_name[o->type];
}
}
/* This command implements SCAN, HSCAN and SSCAN commands.
* If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
* if 'o' is NULL the command will operate on the dictionary associated with
* the current database.
*
* When 'o' is not NULL the function assumes that the first argument in
* the client arguments vector is a key so it skips it before iterating
* in order to parse options.
*
* In the case of a Hash object the function returns both the field and value
* of every element on the Hash. */
void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
int i, j;
listNode *node;
long count = 10;
sds pat = NULL;
sds typename = NULL;
long long type = LLONG_MAX;
int patlen = 0, use_pattern = 0, no_values = 0;
dict *ht;
/* Object must be NULL (to iterate keys names), or the type of the object
* must be Set, Sorted Set, or Hash. */
serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
o->type == OBJ_ZSET);
/* Set i to the first option argument. The previous one is the cursor. */
i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
/* Step 1: Parse options. */
while (i < c->argc) {
j = c->argc - i;
if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
!= C_OK)
{
return;
}
if (count < 1) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
i += 2;
} else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
pat = c->argv[i+1]->ptr;
patlen = sdslen(pat);
/* The pattern always matches if it is exactly "*", so it is
* equivalent to disabling it. */
use_pattern = !(patlen == 1 && pat[0] == '*');
i += 2;
} else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
/* SCAN for a particular type only applies to the db dict */
typename = c->argv[i+1]->ptr;
type = getObjectTypeByName(typename);
if (type == LLONG_MAX) {
/* TODO: uncomment in redis 8.0
addReplyErrorFormat(c, "unknown type name '%s'", typename);
return; */
}
i+= 2;
} else if (!strcasecmp(c->argv[i]->ptr, "novalues")) {
if (!o || o->type != OBJ_HASH) {
addReplyError(c, "NOVALUES option can only be used in HSCAN");
return;
}
no_values = 1;
i++;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
/* Step 2: Iterate the collection.
*
* Note that if the object is encoded with a listpack, intset, or any other
* representation that is not a hash table, we are sure that it is also
* composed of a small number of elements. So to avoid taking state we
* just return everything inside the object in a single call, setting the
* cursor to zero to signal the end of the iteration. */
/* Handle the case of a hash table. */
ht = NULL;
if (o == NULL) {
ht = NULL;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
ht = zs->dict;
}
list *keys = listCreate();
/* Set a free callback for the contents of the collected keys list.
* For the main keyspace dict, and when we scan a key that's dict encoded
* (we have 'ht'), we don't need to define free method because the strings
* in the list are just a shallow copy from the pointer in the dictEntry.
* When scanning a key with other encodings (e.g. listpack), we need to
* free the temporary strings we add to that list.
* The exception to the above is ZSET, where we do allocate temporary
* strings even when scanning a dict. */
if (o && (!ht || o->type == OBJ_ZSET)) {
listSetFreeMethod(keys, (void (*)(void*))sdsfree);
}
/* For main dictionary scan or data structure using hashtable. */
if (!o || ht) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
* of returning no or very few elements. */
long maxiterations = count*10;
/* We pass scanData which have three pointers to the callback:
* 1. data.keys: the list to which it will add new elements;
* 2. data.o: the object containing the dictionary so that
* it is possible to fetch more data in a type-dependent way;
* 3. data.type: the specified type scan in the db, LLONG_MAX means
* type matching is no needed;
* 4. data.pattern: the pattern string;
* 5. data.sampled: the maxiteration limit is there in case we're
* working on an empty dict, one with a lot of empty buckets, and
* for the buckets are not empty, we need to limit the spampled number
* to prevent a long hang time caused by filtering too many keys;
* 6. data.no_values: to control whether values will be returned or
* only keys are returned. */
scanData data = {
.keys = keys,
.o = o,
.type = type,
.pattern = use_pattern ? pat : NULL,
.sampled = 0,
.no_values = no_values,
};
/* A pattern may restrict all matching keys to one cluster slot. */
int onlydidx = -1;
if (o == NULL && use_pattern && server.cluster_enabled) {
onlydidx = patternHashSlot(pat, patlen);
}
do {
/* In cluster mode there is a separate dictionary for each slot.
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, NULL, &data);
} else {
cursor = dictScan(ht, cursor, scanCallback, &data);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
char *str;
char buf[LONG_STR_SIZE];
size_t len;
int64_t llele;
setTypeIterator *si = setTypeInitIterator(o);
while (setTypeNext(si, &str, &len, &llele) != -1) {
if (str == NULL) {
len = ll2string(buf, sizeof(buf), llele);
}
char *key = str ? str : buf;
if (use_pattern && !stringmatchlen(pat, sdslen(pat), key, len, 0)) {
continue;
}
listAddNodeTail(keys, sdsnewlen(key, len));
}
setTypeReleaseIterator(si);
cursor = 0;
} else if ((o->type == OBJ_HASH || o->type == OBJ_ZSET) &&
o->encoding == OBJ_ENCODING_LISTPACK)
{
unsigned char *p = lpFirst(o->ptr);
unsigned char *str;
int64_t len;
unsigned char intbuf[LP_INTBUF_SIZE];
while(p) {
str = lpGet(p, &len, intbuf);
/* point to the value */
p = lpNext(o->ptr, p);
if (use_pattern && !stringmatchlen(pat, sdslen(pat), (char *)str, len, 0)) {
/* jump to the next key/val pair */
p = lpNext(o->ptr, p);
continue;
}
/* add key object */
listAddNodeTail(keys, sdsnewlen(str, len));
/* add value object */
if (!no_values) {
str = lpGet(p, &len, intbuf);
listAddNodeTail(keys, sdsnewlen(str, len));
}
p = lpNext(o->ptr, p);
}
cursor = 0;
} else {
serverPanic("Not handled encoding in SCAN.");
}
/* Step 3: Filter the expired keys */
if (o == NULL && listLength(keys)) {
robj kobj;
listIter li;
listNode *ln;
listRewind(keys, &li);
while ((ln = listNext(&li))) {
sds key = listNodeValue(ln);
initStaticStringObject(kobj, key);
/* Filter an element if it isn't the type we want. */
/* TODO: remove this in redis 8.0 */
if (typename) {
robj* typecheck = lookupKeyReadWithFlags(c->db, &kobj, LOOKUP_NOTOUCH|LOOKUP_NONOTIFY);
if (!typecheck || !objectTypeCompare(typecheck, type)) {
listDelNode(keys, ln);
}
continue;
}
if (expireIfNeeded(c->db, &kobj, 0) != KEY_VALID) {
listDelNode(keys, ln);
}
}
}
/* Step 4: Reply to the client. */
addReplyArrayLen(c, 2);
addReplyBulkLongLong(c,cursor);
addReplyArrayLen(c, listLength(keys));
while ((node = listFirst(keys)) != NULL) {
sds key = listNodeValue(node);
addReplyBulkCBuffer(c, key, sdslen(key));
listDelNode(keys, node);
}
listRelease(keys);
}
/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(client *c) {
unsigned long long cursor;
if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
scanGenericCommand(c,NULL,cursor);
}
void dbsizeCommand(client *c) {
addReplyLongLong(c,kvstoreSize(c->db->keys));
}
void lastsaveCommand(client *c) {
addReplyLongLong(c,server.lastsave);
}
void typeCommand(client *c) {
robj *o;
o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
addReplyStatus(c, getObjectTypeName(o));
}
void shutdownCommand(client *c) {
int flags = SHUTDOWN_NOFLAGS;
int abort = 0;
for (int i = 1; i < c->argc; i++) {
if (!strcasecmp(c->argv[i]->ptr,"nosave")) {
flags |= SHUTDOWN_NOSAVE;
} else if (!strcasecmp(c->argv[i]->ptr,"save")) {
flags |= SHUTDOWN_SAVE;
} else if (!strcasecmp(c->argv[i]->ptr, "now")) {
flags |= SHUTDOWN_NOW;
} else if (!strcasecmp(c->argv[i]->ptr, "force")) {
flags |= SHUTDOWN_FORCE;
} else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
abort = 1;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
if ((abort && flags != SHUTDOWN_NOFLAGS) ||
(flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE))
{
/* Illegal combo. */
addReplyErrorObject(c,shared.syntaxerr);
return;
}
if (abort) {
if (abortShutdown() == C_OK)
addReply(c, shared.ok);
else
addReplyError(c, "No shutdown in progress.");
return;
}
if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) {
addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client");
return;
}
if (!(flags & SHUTDOWN_NOSAVE) && isInsideYieldingLongCommand()) {
/* Script timed out. Shutdown allowed only with the NOSAVE flag. See
* also processCommand where these errors are returned. */
if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply);
} else if (server.busy_module_yield_flags) {
addReplyErrorObject(c, shared.slowmoduleerr);
} else if (scriptIsEval()) {
addReplyErrorObject(c, shared.slowevalerr);
} else {
addReplyErrorObject(c, shared.slowscripterr);
}
return;
}
blockClientShutdown(c);
if (prepareForShutdown(flags) == C_OK) exit(0);
/* If we're here, then shutdown is ongoing (the client is still blocked) or
* failed (the client has received an error). */
}
void renameGenericCommand(client *c, int nx) {
robj *o;
long long expire;
int samekey = 0;
/* When source and dest key is the same, no operation is performed,
* if the key exists, however we still return an error on unexisting key. */
if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
return;
if (samekey) {
addReply(c,nx ? shared.czero : shared.ok);
return;
}
incrRefCount(o);
expire = getExpire(c->db,c->argv[1]);
if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
if (nx) {
decrRefCount(o);
addReply(c,shared.czero);
return;
}
/* Overwrite: delete the old key before creating the new one
* with the same name. */
dbDelete(c->db,c->argv[2]);
}
dbAdd(c->db,c->argv[2],o);
if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
dbDelete(c->db,c->argv[1]);
signalModifiedKey(c,c->db,c->argv[1]);
signalModifiedKey(c,c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
c->argv[1],c->db->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
c->argv[2],c->db->id);
server.dirty++;
addReply(c,nx ? shared.cone : shared.ok);
}
void renameCommand(client *c) {
renameGenericCommand(c,0);
}
void renamenxCommand(client *c) {
renameGenericCommand(c,1);
}
void moveCommand(client *c) {
robj *o;
redictDb *src, *dst;
int srcid, dbid;
long long expire;
if (server.cluster_enabled) {
addReplyError(c,"MOVE is not allowed in cluster mode");
return;
}
/* Obtain source and target DB pointers */
src = c->db;
srcid = c->db->id;
if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK)
return;
if (selectDb(c,dbid) == C_ERR) {
addReplyError(c,"DB index is out of range");
return;
}
dst = c->db;
selectDb(c,srcid); /* Back to the source DB */
/* If the user is moving using as target the same
* DB as the source DB it is probably an error. */
if (src == dst) {
addReplyErrorObject(c,shared.sameobjecterr);
return;
}
/* Check if the element exists and get a reference */
o = lookupKeyWrite(c->db,c->argv[1]);
if (!o) {
addReply(c,shared.czero);
return;
}
expire = getExpire(c->db,c->argv[1]);
/* Return zero if the key already exists in the target DB */
if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
addReply(c,shared.czero);
return;
}
dbAdd(dst,c->argv[1],o);
if (expire != -1) setExpire(c,dst,c->argv[1],expire);
incrRefCount(o);
/* OK! key moved, free the entry in the source DB */
dbDelete(src,c->argv[1]);
signalModifiedKey(c,src,c->argv[1]);
signalModifiedKey(c,dst,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"move_from",c->argv[1],src->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"move_to",c->argv[1],dst->id);
server.dirty++;
addReply(c,shared.cone);
}
void copyCommand(client *c) {
robj *o;
redictDb *src, *dst;
int srcid, dbid;
long long expire;
int j, replace = 0, delete = 0;
/* Obtain source and target DB pointers
* Default target DB is the same as the source DB
* Parse the REPLACE option and targetDB option. */
src = c->db;
dst = c->db;
srcid = c->db->id;
dbid = c->db->id;
for (j = 3; j < c->argc; j++) {
int additional = c->argc - j - 1;
if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "db") && additional >= 1) {
if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK)
return;
if (selectDb(c, dbid) == C_ERR) {
addReplyError(c,"DB index is out of range");
return;
}
dst = c->db;
selectDb(c,srcid); /* Back to the source DB */
j++; /* Consume additional arg. */
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) {
addReplyError(c,"Copying to another database is not allowed in cluster mode");
return;
}
/* If the user select the same DB as
* the source DB and using newkey as the same key
* it is probably an error. */
robj *key = c->argv[1];
robj *newkey = c->argv[2];
if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) {
addReplyErrorObject(c,shared.sameobjecterr);
return;
}
/* Check if the element exists and get a reference */
o = lookupKeyRead(c->db, key);
if (!o) {
addReply(c,shared.czero);
return;
}
expire = getExpire(c->db,key);
/* Return zero if the key already exists in the target DB.
* If REPLACE option is selected, delete newkey from targetDB. */
if (lookupKeyWrite(dst,newkey) != NULL) {
if (replace) {
delete = 1;
} else {
addReply(c,shared.czero);
return;
}
}
/* Duplicate object according to object's type. */
robj *newobj;
switch(o->type) {
case OBJ_STRING: newobj = dupStringObject(o); break;
case OBJ_LIST: newobj = listTypeDup(o); break;
case OBJ_SET: newobj = setTypeDup(o); break;
case OBJ_ZSET: newobj = zsetDup(o); break;
case OBJ_HASH: newobj = hashTypeDup(o); break;
case OBJ_STREAM: newobj = streamDup(o); break;
case OBJ_MODULE:
newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o);
if (!newobj) return;
break;
default:
addReplyError(c, "unknown type object");
return;
}
if (delete) {
dbDelete(dst,newkey);
}
dbAdd(dst,newkey,newobj);
if (expire != -1) setExpire(c, dst, newkey, expire);
/* OK! key copied */
signalModifiedKey(c,dst,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id);
server.dirty++;
addReply(c,shared.cone);
}
/* Helper function for dbSwapDatabases(): scans the list of keys that have
* one or more blocked clients for B[LR]POP or other blocking commands
* and signal the keys as ready if they are of the right type. See the comment
* where the function is used for more info. */
void scanDatabaseForReadyKeys(redictDb *db) {
dictEntry *de;
dictIterator *di = dictGetSafeIterator(db->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
dictEntry *kde = dbFind(db, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
signalKeyAsReady(db, key, value->type);
}
}
dictReleaseIterator(di);
}
/* Since we are unblocking XREADGROUP clients in the event the
* key was deleted/overwritten we must do the same in case the
* database was flushed/swapped. */
void scanDatabaseForDeletedKeys(redictDb *emptied, redictDb *replaced_with) {
dictEntry *de;
dictIterator *di = dictGetSafeIterator(emptied->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int existed = 0, exists = 0;
int original_type = -1, curr_type = -1;
dictEntry *kde = dbFind(emptied, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
original_type = value->type;
existed = 1;
}
if (replaced_with) {
kde = dbFind(replaced_with, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
curr_type = value->type;
exists = 1;
}
}
/* We want to try to unblock any client using a blocking XREADGROUP */
if ((existed && !exists) || original_type != curr_type)
signalDeletedKeyAsReady(emptied, key, original_type);
}
dictReleaseIterator(di);
}
/* Swap two databases at runtime so that all clients will magically see
* the new database even if already connected. Note that the client
* structure c->db points to a given DB, so we need to be smarter and
* swap the underlying referenced structures, otherwise we would need
* to fix all the references to the Redict DB structure.
*
* Returns C_ERR if at least one of the DB ids are out of range, otherwise
* C_OK is returned. */
int dbSwapDatabases(int id1, int id2) {
if (id1 < 0 || id1 >= server.dbnum ||
id2 < 0 || id2 >= server.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
redictDb aux = server.db[id1];
redictDb *db1 = &server.db[id1], *db2 = &server.db[id2];
/* Swapdb should make transaction fail if there is any
* client watching keys */
touchAllWatchedKeysInDb(db1, db2);
touchAllWatchedKeysInDb(db2, db1);
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
scanDatabaseForDeletedKeys(db1, db2);
scanDatabaseForDeletedKeys(db2, db1);
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */
db1->keys = db2->keys;
db1->expires = db2->expires;
db1->avg_ttl = db2->avg_ttl;
db1->expires_cursor = db2->expires_cursor;
db2->keys = aux.keys;
db2->expires = aux.expires;
db2->avg_ttl = aux.avg_ttl;
db2->expires_cursor = aux.expires_cursor;
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
* to exist in the new version of the DB, after the swap.
*
* However normally we only do this check for efficiency reasons
* in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready
* if needed. */
scanDatabaseForReadyKeys(db1);
scanDatabaseForReadyKeys(db2);
return C_OK;
}
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
* database (temp) as the main (active) database, the actual freeing of old database
* (which will now be placed in the temp one) is done later. */
void swapMainDbWithTempDb(redictDb *tempDb) {
for (int i=0; i<server.dbnum; i++) {
redictDb aux = server.db[i];
redictDb *activedb = &server.db[i], *newdb = &tempDb[i];
/* Swapping databases should make transaction fail if there is any
* client watching keys. */
touchAllWatchedKeysInDb(activedb, newdb);
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
scanDatabaseForDeletedKeys(activedb, newdb);
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since clients
* remain in the same DB they were. */
activedb->keys = newdb->keys;
activedb->expires = newdb->expires;
activedb->avg_ttl = newdb->avg_ttl;
activedb->expires_cursor = newdb->expires_cursor;
newdb->keys = aux.keys;
newdb->expires = aux.expires;
newdb->avg_ttl = aux.avg_ttl;
newdb->expires_cursor = aux.expires_cursor;
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
* to exist in the new version of the DB, after the swap.
*
* However normally we only do this check for efficiency reasons
* in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready
* if needed. */
scanDatabaseForReadyKeys(activedb);
}
trackingInvalidateKeysOnFlush(1);
flushSlaveKeysWithExpireList();
}
/* SWAPDB db1 db2 */
void swapdbCommand(client *c) {
int id1, id2;
/* Not allowed in cluster mode: we have just DB 0 there. */
if (server.cluster_enabled) {
addReplyError(c,"SWAPDB is not allowed in cluster mode");
return;
}
/* Get the two DBs indexes. */
if (getIntFromObjectOrReply(c, c->argv[1], &id1,
"invalid first DB index") != C_OK)
return;
if (getIntFromObjectOrReply(c, c->argv[2], &id2,
"invalid second DB index") != C_OK)
return;
/* Swap... */
if (dbSwapDatabases(id1,id2) == C_ERR) {
addReplyError(c,"DB index is out of range");
return;
} else {
RedictModuleSwapDbInfo si = {REDICTMODULE_SWAPDBINFO_VERSION,id1,id2};
moduleFireServerEvent(REDICTMODULE_EVENT_SWAPDB,0,&si);
server.dirty++;
addReply(c,shared.ok);
}
}
/*-----------------------------------------------------------------------------
* Expires API
*----------------------------------------------------------------------------*/
int removeExpire(redictDb *db, robj *key) {
return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK;
}
/* Set an expire to the specified key. If the expire is set in the context
* of an user calling a command 'c' is the client, otherwise 'c' is set
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */
void setExpire(client *c, redictDb *db, robj *key, long long when) {
dictEntry *kde, *de, *existing;
/* Reuse the sds from the main dict in the expire dict */
int slot = getKeySlot(key->ptr);
kde = kvstoreDictFind(db->keys, slot, key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing);
if (existing) {
dictSetSignedIntegerVal(existing, when);
} else {
dictSetSignedIntegerVal(de, when);
}
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
/* Return the expire time of the specified key, or -1 if no expire
* is associated with this key (i.e. the key is non volatile) */
long long getExpire(redictDb *db, robj *key) {
dictEntry *de;
if ((de = dbFindExpires(db, key->ptr)) == NULL)
return -1;
return dictGetSignedIntegerVal(de);
}
/* Delete the specified expired key and propagate expire. */
void deleteExpiredKeyAndPropagate(redictDb *db, robj *keyobj) {
mstime_t expire_latency;
latencyStartMonitor(expire_latency);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
server.stat_expiredkeys++;
}
/* Propagate an implicit key deletion into replicas and the AOF file.
* When a key was deleted in the master by eviction, expiration or a similar
* mechanism a DEL/UNLINK operation for this key is sent
* to all the replicas and the AOF file if enabled.
*
* This way the key deletion is centralized in one place, and since both
* AOF and the replication link guarantee operation ordering, everything
* will be consistent even if we allow write operations against deleted
* keys.
*
* This function may be called from:
* 1. Within call(): Example: Lazy-expire on key access.
* In this case the caller doesn't have to do anything
* because call() handles server.also_propagate(); or
* 2. Outside of call(): Example: Active-expire, eviction, slot ownership changed.
* In this the caller must remember to call
* postExecutionUnitOperations, preferably just after a
* single deletion batch, so that DEL/UNLINK will NOT be wrapped
* in MULTI/EXEC */
void propagateDeletion(redictDb *db, robj *key, int lazy) {
robj *argv[2];
argv[0] = lazy ? shared.unlink : shared.del;
argv[1] = key;
incrRefCount(argv[0]);
incrRefCount(argv[1]);
/* If the master decided to delete a key we must propagate it to replicas no matter what.
* Even if module executed a command without asking for propagation. */
int prev_replication_allowed = server.replication_allowed;
server.replication_allowed = 1;
alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
server.replication_allowed = prev_replication_allowed;
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
/* Check if the key is expired. */
int keyIsExpired(redictDb *db, robj *key) {
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
mstime_t when = getExpire(db,key);
mstime_t now;
if (when < 0) return 0; /* No expire for this key */
now = commandTimeSnapshot();
/* The key expired if the current (virtual or real) time is greater
* than the expire time of the key. */
return now > when;
}
/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because by default replicas do not delete expired keys. They
* wait for DELs from the master for consistency matters. However even
* replicas will try to have a coherent return value for the function,
* so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the
* master has yet to propagate the DEL).
*
* In masters as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* On replicas, this function does not delete expired keys by default, but
* it still returns KEY_EXPIRED if the key is logically expired. To force deletion
* of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
* flag. Note though that if the current client is executing
* replicated commands from the master, keys are never considered expired.
*
* On the other hand, if you just want expiration check, but need to avoid
* the actual key deletion and propagation of the deletion, use the
* EXPIRE_AVOID_DELETE_EXPIRED flag.
*
* The return value of the function is KEY_VALID if the key is still valid.
* The function returns KEY_EXPIRED if the key is expired BUT not deleted,
* or returns KEY_DELETED if the key is expired and deleted. */
keyStatus expireIfNeeded(redictDb *db, robj *key, int flags) {
if (server.lazy_expire_disabled) return KEY_VALID;
if (!keyIsExpired(db,key)) return KEY_VALID;
/* If we are running in the context of a replica, instead of
* evicting the expired key from the database, we return ASAP:
* the replica key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys. The
* exception is when write operations are performed on writable
* replicas.
*
* Still we try to return the right information to the caller,
* that is, KEY_VALID if we think the key should still be valid,
* KEY_EXPIRED if we think the key is expired but don't want to delete it at this time.
*
* When replicating commands from the master, keys are never considered
* expired. */
if (server.masterhost != NULL) {
if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
}
/* In some cases we're explicitly instructed to return an indication of a
* missing key without actually deleting it, even on masters. */
if (flags & EXPIRE_AVOID_DELETE_EXPIRED)
return KEY_EXPIRED;
/* If 'expire' action is paused, for whatever reason, then don't expire any key.
* Typically, at the end of the pause we will properly expire the key OR we
* will have failed over and the new primary will send us the expire. */
if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return KEY_EXPIRED;
/* The key needs to be converted from static to heap before deleted */
int static_key = key->refcount == OBJ_STATIC_REFCOUNT;
if (static_key) {
key = createStringObject(key->ptr, sdslen(key->ptr));
}
/* Delete the key */
deleteExpiredKeyAndPropagate(db,key);
if (static_key) {
decrRefCount(key);
}
return KEY_DELETED;
}
/* CB passed to kvstoreExpand.
* The purpose is to skip expansion of unused dicts in cluster mode (all
* dicts not mapped to *my* slots) */
static int dbExpandSkipSlot(int slot) {
return !clusterNodeCoversSlot(getMyClusterNode(), slot);
}
/*
* This functions increases size of the main/expires db to match desired number.
* In cluster mode resizes all individual dictionaries for slots that this node owns.
*
* Based on the parameter `try_expand`, appropriate dict expand API is invoked.
* if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`.
* The return code is either `DICT_OK`/`DICT_ERR` for both the API(s).
* `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in
* `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
*/
static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand) {
int ret;
if (server.cluster_enabled) {
/* We don't know exact number of keys that would fall into each slot, but we can
* approximate it, assuming even distribution, divide it by the number of slots. */
int slots = getMyShardSlotCount();
if (slots == 0) return C_OK;
db_size = db_size / slots;
ret = kvstoreExpand(kvs, db_size, try_expand, dbExpandSkipSlot);
} else {
ret = kvstoreExpand(kvs, db_size, try_expand, NULL);
}
return ret? C_OK : C_ERR;
}
int dbExpand(redictDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->keys, db_size, try_expand);
}
int dbExpandExpires(redictDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->expires, db_size, try_expand);
}
static dictEntry *dbFindGeneric(kvstore *kvs, void *key) {
return kvstoreDictFind(kvs, getKeySlot(key), key);
}
dictEntry *dbFind(redictDb *db, void *key) {
return dbFindGeneric(db->keys, key);
}
dictEntry *dbFindExpires(redictDb *db, void *key) {
return dbFindGeneric(db->expires, key);
}
unsigned long long dbSize(redictDb *db) {
return kvstoreSize(db->keys);
}
unsigned long long dbScan(redictDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata) {
return kvstoreScan(db->keys, cursor, -1, scan_cb, NULL, privdata);
}
/* -----------------------------------------------------------------------------
* API to get key arguments from commands
* ---------------------------------------------------------------------------*/
/* Prepare the getKeysResult struct to hold numkeys, either by using the
* pre-allocated keysbuf or by allocating a new array on the heap.
*
* This function must be called at least once before starting to populate
* the result, and can be called repeatedly to enlarge the result array.
*/
keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys) {
/* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack
* buffer here. */
if (!result->keys) {
serverAssert(!result->numkeys);
result->keys = result->keysbuf;
}
/* Resize if necessary */
if (numkeys > result->size) {
if (result->keys != result->keysbuf) {
/* We're not using a static buffer, just (re)alloc */
result->keys = zrealloc(result->keys, numkeys * sizeof(keyReference));
} else {
/* We are using a static buffer, copy its contents */
result->keys = zmalloc(numkeys * sizeof(keyReference));
if (result->numkeys)
memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(keyReference));
}
result->size = numkeys;
}
return result->keys;
}
/* Returns a bitmask with all the flags found in any of the key specs of the command.
* The 'inv' argument means we'll return a mask with all flags that are missing in at least one spec. */
int64_t getAllKeySpecsFlags(struct redictCommand *cmd, int inv) {
int64_t flags = 0;
for (int j = 0; j < cmd->key_specs_num; j++) {
keySpec *spec = cmd->key_specs + j;
flags |= inv? ~spec->flags : spec->flags;
}
return flags;
}
/* Fetch the keys based of the provided key specs. Returns the number of keys found, or -1 on error.
* There are several flags that can be used to modify how this function finds keys in a command.
*
* GET_KEYSPEC_INCLUDE_NOT_KEYS: Return 'fake' keys as if they were keys.
* GET_KEYSPEC_RETURN_PARTIAL: Skips invalid and incomplete keyspecs but returns the keys
* found in other valid keyspecs.
*/
int getKeysUsingKeySpecs(struct redictCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
int j, i, last, first, step;
keyReference *keys;
serverAssert(result->numkeys == 0); /* caller should initialize or reset it */
for (j = 0; j < cmd->key_specs_num; j++) {
keySpec *spec = cmd->key_specs + j;
serverAssert(spec->begin_search_type != KSPEC_BS_INVALID);
/* Skip specs that represent 'fake' keys */
if ((spec->flags & CMD_KEY_NOT_KEY) && !(search_flags & GET_KEYSPEC_INCLUDE_NOT_KEYS)) {
continue;
}
first = 0;
if (spec->begin_search_type == KSPEC_BS_INDEX) {
first = spec->bs.index.pos;
} else if (spec->begin_search_type == KSPEC_BS_KEYWORD) {
int start_index = spec->bs.keyword.startfrom > 0 ? spec->bs.keyword.startfrom : argc+spec->bs.keyword.startfrom;
int end_index = spec->bs.keyword.startfrom > 0 ? argc-1: 1;
for (i = start_index; i != end_index; i = start_index <= end_index ? i + 1 : i - 1) {
if (i >= argc || i < 1)
break;
if (!strcasecmp((char*)argv[i]->ptr,spec->bs.keyword.keyword)) {
first = i+1;
break;
}
}
/* keyword not found */
if (!first) {
continue;
}
} else {
/* unknown spec */
goto invalid_spec;
}
if (spec->find_keys_type == KSPEC_FK_RANGE) {
step = spec->fk.range.keystep;
if (spec->fk.range.lastkey >= 0) {
last = first + spec->fk.range.lastkey;
} else {
if (!spec->fk.range.limit) {
last = argc + spec->fk.range.lastkey;
} else {
serverAssert(spec->fk.range.lastkey == -1);
last = first + ((argc-first)/spec->fk.range.limit + spec->fk.range.lastkey);
}
}
} else if (spec->find_keys_type == KSPEC_FK_KEYNUM) {
step = spec->fk.keynum.keystep;
long long numkeys;
if (spec->fk.keynum.keynumidx >= argc)
goto invalid_spec;
sds keynum_str = argv[first + spec->fk.keynum.keynumidx]->ptr;
if (!string2ll(keynum_str,sdslen(keynum_str),&numkeys) || numkeys < 0) {
/* Unable to parse the numkeys argument or it was invalid */
goto invalid_spec;
}
first += spec->fk.keynum.firstkey;
last = first + (int)numkeys-1;
} else {
/* unknown spec */
goto invalid_spec;
}
int count = ((last - first)+1);
keys = getKeysPrepareResult(result, result->numkeys + count);
/* First or last is out of bounds, which indicates a syntax error */
if (last >= argc || last < first || first >= argc) {
goto invalid_spec;
}
for (i = first; i <= last; i += step) {
if (i >= argc || i < first) {
/* Modules commands, and standard commands with a not fixed number
* of arguments (negative arity parameter) do not have dispatch
* time arity checks, so we need to handle the case where the user
* passed an invalid number of arguments here. In this case we
* return no keys and expect the command implementation to report
* an arity or syntax error. */
if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
continue;
} else {
serverPanic("Redict built-in command declared keys positions not matching the arity requirements.");
}
}
keys[result->numkeys].pos = i;
keys[result->numkeys].flags = spec->flags;
result->numkeys++;
}
/* Handle incomplete specs (only after we added the current spec
* to `keys`, just in case GET_KEYSPEC_RETURN_PARTIAL was given) */
if (spec->flags & CMD_KEY_INCOMPLETE) {
goto invalid_spec;
}
/* Done with this spec */
continue;
invalid_spec:
if (search_flags & GET_KEYSPEC_RETURN_PARTIAL) {
continue;
} else {
result->numkeys = 0;
return -1;
}
}
return result->numkeys;
}
/* Return all the arguments that are keys in the command passed via argc / argv.
* This function will eventually replace getKeysFromCommand.
*
* The command returns the positions of all the key arguments inside the array,
* so the actual return value is a heap allocated array of integers. The
* length of the array is returned by reference into *numkeys.
*
* Along with the position, this command also returns the flags that are
* associated with how Redict will access the key.
*
* 'cmd' must be point to the corresponding entry into the redictCommand
* table, according to the command name in argv[0]. */
int getKeysFromCommandWithSpecs(struct redictCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
/* The command has at least one key-spec not marked as NOT_KEY */
int has_keyspec = (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY);
/* The command has at least one key-spec marked as VARIABLE_FLAGS */
int has_varflags = (getAllKeySpecsFlags(cmd, 0) & CMD_KEY_VARIABLE_FLAGS);
/* We prefer key-specs if there are any, and their flags are reliable. */
if (has_keyspec && !has_varflags) {
int ret = getKeysUsingKeySpecs(cmd,argv,argc,search_flags,result);
if (ret >= 0)
return ret;
/* If the specs returned with an error (probably an INVALID or INCOMPLETE spec),
* fallback to the callback method. */
}
/* Resort to getkeys callback methods. */
if (cmd->flags & CMD_MODULE_GETKEYS)
return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
/* We use native getkeys as a last resort, since not all these native getkeys provide
* flags properly (only the ones that correspond to INVALID, INCOMPLETE or VARIABLE_FLAGS do.*/
if (cmd->getkeys_proc)
return cmd->getkeys_proc(cmd,argv,argc,result);
return 0;
}
/* This function returns a sanity check if the command may have keys. */
int doesCommandHaveKeys(struct redictCommand *cmd) {
return cmd->getkeys_proc || /* has getkeys_proc (non modules) */
(cmd->flags & CMD_MODULE_GETKEYS) || /* module with GETKEYS */
(getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); /* has at least one key-spec not marked as NOT_KEY */
}
/* A simplified channel spec table that contains all of the redict commands
* and which channels they have and how they are accessed. */
typedef struct ChannelSpecs {
redictCommandProc *proc; /* Command procedure to match against */
uint64_t flags; /* CMD_CHANNEL_* flags for this command */
int start; /* The initial position of the first channel */
int count; /* The number of channels, or -1 if all remaining
* arguments are channels. */
} ChannelSpecs;
ChannelSpecs commands_with_channels[] = {
{subscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
{ssubscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
{unsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
{sunsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
{psubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_SUBSCRIBE, 1, -1},
{punsubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
{publishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
{spublishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
{NULL,0} /* Terminator. */
};
/* Returns 1 if the command may access any channels matched by the flags
* argument. */
int doesCommandHaveChannelsWithFlags(struct redictCommand *cmd, int flags) {
/* If a module declares get channels, we are just going to assume
* has channels. This API is allowed to return false positives. */
if (cmd->flags & CMD_MODULE_GETCHANNELS) {
return 1;
}
for (ChannelSpecs *spec = commands_with_channels; spec->proc != NULL; spec += 1) {
if (cmd->proc == spec->proc) {
return !!(spec->flags & flags);
}
}
return 0;
}
/* Return all the arguments that are channels in the command passed via argc / argv.
* This function behaves similar to getKeysFromCommandWithSpecs, but with channels
* instead of keys.
*
* The command returns the positions of all the channel arguments inside the array,
* so the actual return value is a heap allocated array of integers. The
* length of the array is returned by reference into *numkeys.
*
* Along with the position, this command also returns the flags that are
* associated with how Redict will access the channel.
*
* 'cmd' must be point to the corresponding entry into the redictCommand
* table, according to the command name in argv[0]. */
int getChannelsFromCommand(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
keyReference *keys;
/* If a module declares get channels, use that. */
if (cmd->flags & CMD_MODULE_GETCHANNELS) {
return moduleGetCommandChannelsViaAPI(cmd, argv, argc, result);
}
/* Otherwise check the channel spec table */
for (ChannelSpecs *spec = commands_with_channels; spec != NULL; spec += 1) {
if (cmd->proc == spec->proc) {
int start = spec->start;
int stop = (spec->count == -1) ? argc : start + spec->count;
if (stop > argc) stop = argc;
int count = 0;
keys = getKeysPrepareResult(result, stop - start);
for (int i = start; i < stop; i++ ) {
keys[count].pos = i;
keys[count++].flags = spec->flags;
}
result->numkeys = count;
return count;
}
}
return 0;
}
/* The base case is to use the keys position as given in the command table
* (firstkey, lastkey, step).
* This function works only on command with the legacy_range_key_spec,
* all other commands should be handled by getkeys_proc.
*
* If the commands keyspec is incomplete, no keys will be returned, and the provided
* keys function should be called instead.
*
* NOTE: This function does not guarantee populating the flags for
* the keys, in order to get flags you should use getKeysUsingKeySpecs. */
int getKeysUsingLegacyRangeSpec(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int j, i = 0, last, first, step;
keyReference *keys;
UNUSED(argv);
if (cmd->legacy_range_key_spec.begin_search_type == KSPEC_BS_INVALID) {
result->numkeys = 0;
return 0;
}
first = cmd->legacy_range_key_spec.bs.index.pos;
last = cmd->legacy_range_key_spec.fk.range.lastkey;
if (last >= 0)
last += first;
step = cmd->legacy_range_key_spec.fk.range.keystep;
if (last < 0) last = argc+last;
int count = ((last - first)+1);
keys = getKeysPrepareResult(result, count);
for (j = first; j <= last; j += step) {
if (j >= argc || j < first) {
/* Modules commands, and standard commands with a not fixed number
* of arguments (negative arity parameter) do not have dispatch
* time arity checks, so we need to handle the case where the user
* passed an invalid number of arguments here. In this case we
* return no keys and expect the command implementation to report
* an arity or syntax error. */
if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
result->numkeys = 0;
return 0;
} else {
serverPanic("Redict built-in command declared keys positions not matching the arity requirements.");
}
}
keys[i].pos = j;
/* Flags are omitted from legacy key specs */
keys[i++].flags = 0;
}
result->numkeys = i;
return i;
}
/* Return all the arguments that are keys in the command passed via argc / argv.
*
* The command returns the positions of all the key arguments inside the array,
* so the actual return value is a heap allocated array of integers. The
* length of the array is returned by reference into *numkeys.
*
* 'cmd' must be point to the corresponding entry into the redictCommand
* table, according to the command name in argv[0].
*
* This function uses the command table if a command-specific helper function
* is not required, otherwise it calls the command-specific function. */
int getKeysFromCommand(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
if (cmd->flags & CMD_MODULE_GETKEYS) {
return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
} else if (cmd->getkeys_proc) {
return cmd->getkeys_proc(cmd,argv,argc,result);
} else {
return getKeysUsingLegacyRangeSpec(cmd,argv,argc,result);
}
}
/* Free the result of getKeysFromCommand. */
void getKeysFreeResult(getKeysResult *result) {
if (result && result->keys != result->keysbuf)
zfree(result->keys);
}
/* Helper function to extract keys from following commands:
* COMMAND [destkey] <num-keys> <key> [...] <key> [...] ... <options>
*
* eg:
* ZUNION <num-keys> <key> <key> ... <key> <options>
* ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
*
* 'storeKeyOfs': destkey index, 0 means destkey not exists.
* 'keyCountOfs': num-keys index.
* 'firstKeyOfs': firstkey index.
* 'keyStep': the interval of each key, usually this value is 1.
*
* The commands using this function have a fully defined keyspec, so returning flags isn't needed. */
int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keyStep,
robj **argv, int argc, getKeysResult *result) {
int i, num;
keyReference *keys;
num = atoi(argv[keyCountOfs]->ptr);
/* Sanity check. Don't return any key if the command is going to
* reply with syntax error. (no input keys). */
if (num < 1 || num > (argc - firstKeyOfs)/keyStep) {
result->numkeys = 0;
return 0;
}
int numkeys = storeKeyOfs ? num + 1 : num;
keys = getKeysPrepareResult(result, numkeys);
result->numkeys = numkeys;
/* Add all key positions for argv[firstKeyOfs...n] to keys[] */
for (i = 0; i < num; i++) {
keys[i].pos = firstKeyOfs+(i*keyStep);
keys[i].flags = 0;
}
if (storeKeyOfs) {
keys[num].pos = storeKeyOfs;
keys[num].flags = 0;
}
return result->numkeys;
}
int sintercardGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}
int zunionInterDiffStoreGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(1, 2, 3, 1, argv, argc, result);
}
int zunionInterDiffGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}
int evalGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
int functionGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
int lmpopGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}
int blmpopGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
int zmpopGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}
int bzmpopGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
/* Helper function to extract keys from the SORT RO command.
*
* SORT <sort-key>
*
* The second argument of SORT is always a key, however an arbitrary number of
* keys may be accessed while doing the sort (the BY and GET args), so the
* key-spec declares incomplete keys which is why we have to provide a concrete
* implementation to fetch the keys.
*
* This command declares incomplete keys, so the flags are correctly set for this function */
int sortROGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
keyReference *keys;
UNUSED(cmd);
UNUSED(argv);
UNUSED(argc);
keys = getKeysPrepareResult(result, 1);
keys[0].pos = 1; /* <sort-key> is always present. */
keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
result->numkeys = 1;
return result->numkeys;
}
/* Helper function to extract keys from the SORT command.
*
* SORT <sort-key> ... STORE <store-key> ...
*
* The first argument of SORT is always a key, however a list of options
* follow in SQL-alike style. Here we parse just the minimum in order to
* correctly identify keys in the "STORE" option.
*
* This command declares incomplete keys, so the flags are correctly set for this function */
int sortGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, j, num, found_store = 0;
keyReference *keys;
UNUSED(cmd);
num = 0;
keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */
keys[num].pos = 1; /* <sort-key> is always present. */
keys[num++].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
/* Search for STORE option. By default we consider options to don't
* have arguments, so if we find an unknown option name we scan the
* next. However there are options with 1 or 2 arguments, so we
* provide a list here in order to skip the right number of args. */
struct {
char *name;
int skip;
} skiplist[] = {
{"limit", 2},
{"get", 1},
{"by", 1},
{NULL, 0} /* End of elements. */
};
for (i = 2; i < argc; i++) {
for (j = 0; skiplist[j].name != NULL; j++) {
if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
i += skiplist[j].skip;
break;
} else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
/* Note: we don't increment "num" here and continue the loop
* to be sure to process the *last* "STORE" option if multiple
* ones are provided. This is same behavior as SORT. */
found_store = 1;
keys[num].pos = i+1; /* <store-key> */
keys[num].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
break;
}
}
}
result->numkeys = num + found_store;
return result->numkeys;
}
/* This command declares incomplete keys, so the flags are correctly set for this function */
int migrateGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, j, num, first;
keyReference *keys;
UNUSED(cmd);
/* Assume the obvious form. */
first = 3;
num = 1;
/* But check for the extended one with the KEYS option. */
struct {
char* name;
int skip;
} skip_keywords[] = {
{"copy", 0},
{"replace", 0},
{"auth", 1},
{"auth2", 2},
{NULL, 0}
};
if (argc > 6) {
for (i = 6; i < argc; i++) {
if (!strcasecmp(argv[i]->ptr, "keys")) {
if (sdslen(argv[3]->ptr) > 0) {
/* This is a syntax error. So ignore the keys and leave
* the syntax error to be handled by migrateCommand. */
num = 0;
} else {
first = i + 1;
num = argc - first;
}
break;
}
for (j = 0; skip_keywords[j].name != NULL; j++) {
if (!strcasecmp(argv[i]->ptr, skip_keywords[j].name)) {
i += skip_keywords[j].skip;
break;
}
}
}
}
keys = getKeysPrepareResult(result, num);
for (i = 0; i < num; i++) {
keys[i].pos = first+i;
keys[i].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_DELETE;
}
result->numkeys = num;
return num;
}
/* Helper function to extract keys from following commands:
* GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
* [COUNT count] [STORE key|STOREDIST key]
* GEORADIUSBYMEMBER key member radius unit ... options ...
*
* This command has a fully defined keyspec, so returning flags isn't needed. */
int georadiusGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, num;
keyReference *keys;
UNUSED(cmd);
/* Check for the presence of the stored key in the command */
int stored_key = -1;
for (i = 5; i < argc; i++) {
char *arg = argv[i]->ptr;
/* For the case when user specifies both "store" and "storedist" options, the
* second key specified would override the first key. This behavior is kept
* the same as in georadiusCommand method.
*/
if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
stored_key = i+1;
i++;
}
}
num = 1 + (stored_key == -1 ? 0 : 1);
/* Keys in the command come from two places:
* argv[1] = key,
* argv[5...n] = stored key if present
*/
keys = getKeysPrepareResult(result, num);
/* Add all key positions to keys[] */
keys[0].pos = 1;
keys[0].flags = 0;
if(num > 1) {
keys[1].pos = stored_key;
keys[1].flags = 0;
}
result->numkeys = num;
return num;
}
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
*
* This command has a fully defined keyspec, so returning flags isn't needed. */
int xreadGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, num = 0;
keyReference *keys;
UNUSED(cmd);
/* We need to parse the options of the command in order to seek the first
* "STREAMS" string which is actually the option. This is needed because
* "STREAMS" could also be the name of the consumer group and even the
* name of the stream key. */
int streams_pos = -1;
for (i = 1; i < argc; i++) {
char *arg = argv[i]->ptr;
if (!strcasecmp(arg, "block")) {
i++; /* Skip option argument. */
} else if (!strcasecmp(arg, "count")) {
i++; /* Skip option argument. */
} else if (!strcasecmp(arg, "group")) {
i += 2; /* Skip option argument. */
} else if (!strcasecmp(arg, "noack")) {
/* Nothing to do. */
} else if (!strcasecmp(arg, "streams")) {
streams_pos = i;
break;
} else {
break; /* Syntax error. */
}
}
if (streams_pos != -1) num = argc - streams_pos - 1;
/* Syntax error. */
if (streams_pos == -1 || num == 0 || num % 2 != 0) {
result->numkeys = 0;
return 0;
}
num /= 2; /* We have half the keys as there are arguments because
there are also the IDs, one per key. */
keys = getKeysPrepareResult(result, num);
for (i = streams_pos+1; i < argc-num; i++) {
keys[i-streams_pos-1].pos = i;
keys[i-streams_pos-1].flags = 0;
}
result->numkeys = num;
return num;
}
/* Helper function to extract keys from the SET command, which may have
* a read flag if the GET argument is passed in. */
int setGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
keyReference *keys;
UNUSED(cmd);
keys = getKeysPrepareResult(result, 1);
keys[0].pos = 1; /* We always know the position */
result->numkeys = 1;
for (int i = 3; i < argc; i++) {
char *arg = argv[i]->ptr;
if ((arg[0] == 'g' || arg[0] == 'G') &&
(arg[1] == 'e' || arg[1] == 'E') &&
(arg[2] == 't' || arg[2] == 'T') && arg[3] == '\0')
{
keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
return 1;
}
}
keys[0].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
return 1;
}
/* Helper function to extract keys from the BITFIELD command, which may be
* read-only if the BITFIELD GET subcommand is used. */
int bitfieldGetKeys(struct redictCommand *cmd, robj **argv, int argc, getKeysResult *result) {
keyReference *keys;
int readonly = 1;
UNUSED(cmd);
keys = getKeysPrepareResult(result, 1);
keys[0].pos = 1; /* We always know the position */
result->numkeys = 1;
for (int i = 2; i < argc; i++) {
int remargs = argc - i - 1; /* Remaining args other than current. */
char *arg = argv[i]->ptr;
if (!strcasecmp(arg, "get") && remargs >= 2) {
i += 2;
} else if ((!strcasecmp(arg, "set") || !strcasecmp(arg, "incrby")) && remargs >= 3) {
readonly = 0;
i += 3;
break;
} else if (!strcasecmp(arg, "overflow") && remargs >= 1) {
i += 1;
} else {
readonly = 0; /* Syntax error. safer to assume non-RO. */
break;
}
}
if (readonly) {
keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
} else {
keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
}
return 1;
}
|