1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337 4338 4339 4340 4341 4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412 4413 4414 4415 4416 4417 4418 4419 4420 4421 4422 4423 4424 4425 4426 4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448 4449 4450 4451 4452 4453 4454 4455 4456 4457 4458 4459 4460 4461 4462 4463 4464 4465 4466 4467 4468 4469 4470 4471 4472 4473 4474 4475 4476 4477 4478 4479 4480 4481 4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498 4499 4500 4501 4502 4503 4504
|
# Author: Martin D. Smith
"""
Classes representing DataSets of various types.
"""
from __future__ import absolute_import
from __future__ import division
import copy
import errno
import hashlib
import itertools
import logging
import os
import re
import shutil
import tempfile
import uuid
import xml.dom.minidom
import numpy as np
from urlparse import urlparse
from functools import wraps, partial
from collections import defaultdict, Counter
from pbcore.util.Process import backticks
from pbcore.chemistry.chemistry import ChemistryLookupError
from pbcore.io.align.PacBioBamIndex import PBI_FLAGS_BARCODE
from pbcore.io.FastaIO import splitFastaHeader, FastaWriter
from pbcore.io.FastqIO import FastqReader, FastqWriter, qvsFromAscii
from pbcore.io import (BaxH5Reader, FastaReader, IndexedFastaReader,
CmpH5Reader, IndexedBamReader, BamReader)
from pbcore.io.align._BamSupport import UnavailableFeature
from pbcore.io.dataset.DataSetReader import (parseStats, populateDataSet,
resolveLocation, xmlRootType,
wrapNewResource, openFofnFile,
parseMetadata, checksts)
from pbcore.io.dataset.DataSetWriter import toXml
from pbcore.io.dataset.DataSetValidator import validateString
from pbcore.io.dataset.DataSetMembers import (DataSetMetadata,
SubreadSetMetadata,
ContigSetMetadata,
BarcodeSetMetadata,
ExternalResources,
ExternalResource, Filters)
from pbcore.io.dataset.utils import (_infixFname, _pbindexBam,
_indexBam, _indexFasta, _fileCopy,
_swapPath, which, consolidateXml,
getTimeStampedName, getCreatedAt)
from pbcore.io.dataset.DataSetErrors import (InvalidDataSetIOError,
ResourceMismatchError)
from pbcore.io.dataset.DataSetMetaTypes import (DataSetMetaTypes, toDsId,
dsIdToSuffix)
from pbcore.io.dataset.DataSetUtils import fileType
from functools import reduce
log = logging.getLogger(__name__)
def openDataSet(*files, **kwargs):
"""Factory function for DataSet types as suggested by the first file"""
if files[0].endswith('xml'):
tbrType = _typeDataSet(files[0])
return tbrType(*files, **kwargs)
return openDataFile(*files, **kwargs)
def _dsIdToName(dsId):
"""Translate a MetaType/ID into a class name"""
if DataSetMetaTypes.isValid(dsId):
return dsId.split('.')[-1]
else:
raise InvalidDataSetIOError("Invalid DataSet MetaType")
def _dsIdToType(dsId):
"""Translate a MetaType/ID into a type"""
if DataSetMetaTypes.isValid(dsId):
types = DataSet.castableTypes()
return types[_dsIdToName(dsId)]
else:
raise InvalidDataSetIOError("Invalid DataSet MetaType")
def _typeDataSet(dset):
"""Determine the type of a dataset from the xml file without opening it"""
xml_rt = xmlRootType(dset)
dsId = toDsId(xml_rt)
tbrType = _dsIdToType(dsId)
return tbrType
def isDataSet(xmlfile):
"""Determine if a file is a DataSet before opening it"""
try:
_typeDataSet(xmlfile)
return True
except Exception:
return False
def openDataFile(*files, **kwargs):
"""Factory function for DataSet types determined by the first data file"""
possibleTypes = [AlignmentSet, SubreadSet, ConsensusReadSet,
ConsensusAlignmentSet, ReferenceSet, HdfSubreadSet]
origFiles = files
fileMap = defaultdict(list)
for dstype in possibleTypes:
for ftype in dstype._metaTypeMapping():
fileMap[ftype].append(dstype)
ext = fileType(files[0])
if ext == 'fofn':
files = openFofnFile(files[0])
ext = fileType(files[0])
if ext == 'xml':
dsType = _typeDataSet(files[0])
return dsType(*origFiles, **kwargs)
options = fileMap[ext]
if len(options) == 1:
return options[0](*origFiles, **kwargs)
else:
# peek in the files to figure out the best match
if ReferenceSet in options:
log.warn("Fasta files aren't unambiguously reference vs contig, "
"opening as ReferenceSet")
return ReferenceSet(*origFiles, **kwargs)
elif AlignmentSet in options:
# it is a bam file
if files[0].endswith('bam'):
bam = BamReader(files[0])
else:
bam = CmpH5Reader(files[0])
if bam.isMapped:
if bam.readType == "CCS":
return ConsensusAlignmentSet(*origFiles, **kwargs)
else:
return AlignmentSet(*origFiles, **kwargs)
else:
if bam.readType == "CCS":
return ConsensusReadSet(*origFiles, **kwargs)
else:
return SubreadSet(*origFiles, **kwargs)
def filtered(generator):
"""Wrap a generator with postfiltering"""
@wraps(generator)
def wrapper(dset, *args, **kwargs):
filter_tests = dset.processFilters()
no_filter = dset.noFiltering
if no_filter:
for read in generator(dset, *args, **kwargs):
yield read
else:
for read in generator(dset, *args, **kwargs):
if any(filt(read) for filt in filter_tests):
yield read
return wrapper
def _checkFields(fieldNames):
"""Check for identical PBI field names"""
for field in fieldNames:
if field != fieldNames[0]:
log.error("Mismatched bam.pbi column names:\n{}".format(
'\n'.join(map(str, fieldNames))))
raise InvalidDataSetIOError(
"Mismatched bam.pbi columns. Mixing mapped/unmapped or "
"barcoded/non-barcoded bam files isn't allowed.")
def _stackRecArrays(recArrays):
"""Stack recarrays into a single larger recarray"""
empties = []
nonempties = []
for array in recArrays:
if len(array) == 0:
empties.append(array)
else:
nonempties.append(array)
if nonempties:
_checkFields([arr.dtype.names for arr in nonempties])
tbr = np.concatenate(nonempties)
tbr = tbr.view(np.recarray)
return tbr
else:
_checkFields([arr.dtype.names for arr in empties])
tbr = np.concatenate(empties)
tbr = tbr.view(np.recarray)
return tbr
def _uniqueRecords(recArray):
"""Remove duplicate records"""
unique = set()
unique_i = []
for i, rec in enumerate(recArray):
rect = tuple(rec)
if rect not in unique:
unique.add(rect)
unique_i.append(i)
return recArray[unique_i]
def _fieldsView(recArray, fields):
vdtype = np.dtype({fi:recArray.dtype.fields[fi] for fi in fields})
return np.ndarray(recArray.shape, vdtype, recArray, 0, recArray.strides)
def _renameField(recArray, current, new):
ind = recArray.dtype.names.index(current)
names = list(recArray.dtype.names)
names[ind] = new
recArray.dtype.names = names
def _flatten(lol, times=1):
"""This wont do well with mixed nesting"""
for _ in range(times):
lol = np.concatenate(lol)
return lol
def _ranges_in_list(alist):
"""Takes a sorted list, finds the boundaries of runs of each value"""
unique, indices, counts = np.unique(np.array(alist), return_index=True,
return_counts=True)
return {u: (i, i + c) for u, i, c in zip(unique, indices, counts)}
def divideKeys(keys, chunks):
"""Returns all of the keys in a list of lists, corresponding to evenly
sized chunks of the original keys"""
if chunks < 1:
return []
if chunks > len(keys):
chunks = len(keys)
chunksize = len(keys)//chunks
key_chunks = [keys[(i * chunksize):((i + 1) * chunksize)] for i in
range(chunks-1)]
key_chunks.append(keys[((chunks - 1) * chunksize):])
return key_chunks
def splitKeys(keys, chunks):
"""Returns key pairs for each chunk defining the bounds of each chunk"""
if chunks < 1:
return []
if chunks > len(keys):
chunks = len(keys)
chunksize = len(keys)//chunks
chunksizes = [chunksize] * chunks
i = 0
while sum(chunksizes) < len(keys):
chunksizes[i] += 1
i += 1
i %= chunks
key_chunks = []
start = 0
for cs in chunksizes:
key_chunks.append((keys[start], keys[start + cs - 1]))
start += cs
return key_chunks
def _fileExists(fname):
"""Assert that a file exists with a useful failure mode"""
if not isinstance(fname, basestring):
fname = fname.resourceId
if not os.path.exists(fname):
raise InvalidDataSetIOError("Resource {f} not found".format(f=fname))
return True
def checkAndResolve(fname, possibleRelStart=None):
"""Try and skip resolveLocation if possible"""
tbr = fname
if not fname.startswith(os.path.sep):
log.debug('Unable to assume path is already absolute')
tbr = resolveLocation(fname, possibleRelStart)
return tbr
def _pathChanger(osPathFunc, metaTypeFunc, resource):
"""Apply these two functions to the resource or ResourceId"""
resId = resource.resourceId
currentPath = urlparse(resId)
if currentPath.scheme == 'file' or not currentPath.scheme:
currentPath = currentPath.path
currentPath = osPathFunc(currentPath)
resource.resourceId = currentPath
metaTypeFunc(resource)
def _copier(dest, resource, subfolder=None):
if subfolder is None:
subfolder = [uuid.uuid4()]
resId = resource.resourceId
currentPath = urlparse(resId)
if currentPath.scheme == 'file' or not currentPath.scheme:
currentPath = currentPath.path
if resource.KEEP_WITH_PARENT:
currentPath = _fileCopy(dest, currentPath, uuid=subfolder[0])
else:
currentPath = _fileCopy(dest, currentPath, uuid=resource.uniqueId)
subfolder[0] = resource.uniqueId
resource.resourceId = currentPath
class DataSet(object):
"""The record containing the DataSet information, with possible type
specific subclasses"""
datasetType = DataSetMetaTypes.ALL
def __init__(self, *files, **kwargs):
"""DataSet constructor
Initialize representations of the ExternalResources, MetaData,
Filters, and LabeledSubsets, parse inputs if possible
Args:
:files: one or more filenames or uris to read
:strict=False: strictly require all index files
:skipCounts=False: skip updating counts for faster opening
Doctest:
>>> import os, tempfile
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet, SubreadSet
>>> # Prog like pbalign provides a .bam file:
>>> # e.g. d = AlignmentSet("aligned.bam")
>>> # Something like the test files we have:
>>> inBam = data.getBam()
>>> d = AlignmentSet(inBam)
>>> # A UniqueId is generated, despite being a BAM input
>>> bool(d.uuid)
True
>>> dOldUuid = d.uuid
>>> # They can write this BAM to an XML:
>>> # e.g. d.write("alignmentset.xml")
>>> outdir = tempfile.mkdtemp(suffix="dataset-doctest")
>>> outXml = os.path.join(outdir, 'tempfile.xml')
>>> d.write(outXml)
>>> # And then recover the same XML:
>>> d = AlignmentSet(outXml)
>>> # The UniqueId will be the same
>>> d.uuid == dOldUuid
True
>>> # Inputs can be many and varied
>>> ds1 = AlignmentSet(data.getXml(8), data.getBam(1))
>>> ds1.numExternalResources
2
>>> ds1 = AlignmentSet(data.getFofn())
>>> ds1.numExternalResources
2
>>> # Constructors should be used directly
>>> SubreadSet(data.getSubreadSet(),
... skipMissing=True) # doctest:+ELLIPSIS
<SubreadSet...
>>> # Even with untyped inputs
>>> AlignmentSet(data.getBam()) # doctest:+ELLIPSIS
<AlignmentSet...
>>> # AlignmentSets can also be manipulated after opening:
>>> # Add external Resources:
>>> ds = AlignmentSet()
>>> _ = ds.externalResources.addResources(["IdontExist.bam"])
>>> ds.externalResources[-1].resourceId == "IdontExist.bam"
True
>>> # Add an index file
>>> pbiName = "IdontExist.bam.pbi"
>>> ds.externalResources[-1].addIndices([pbiName])
>>> ds.externalResources[-1].indices[0].resourceId == pbiName
True
"""
files = [str(fn) for fn in files]
self._strict = kwargs.get('strict', False)
skipMissing = kwargs.get('skipMissing', False)
self._skipCounts = kwargs.get('skipCounts', False)
_induceIndices = kwargs.get('generateIndices', False)
# The metadata concerning the DataSet or subtype itself (e.g.
# name, version, UniqueId)
self.objMetadata = {}
# The metadata contained in the DataSet or subtype (e.g.
# NumRecords, BioSamples, Collections, etc.
self._metadata = DataSetMetadata()
self.externalResources = ExternalResources()
self._filters = Filters()
# list of DataSet objects representing subsets
self.subdatasets = []
# Why not keep this around... (filled by file reader)
self.fileNames = files
# parse files
populateDataSet(self, files)
if not skipMissing and len(files):
log.debug("Checking that the files exist...")
self._modResources(_fileExists)
log.debug("Done checking that the files exist")
# DataSet base class shouldn't really be used. It is ok-ish for just
# basic xml mainpulation. May start warning at some point, but
# openDataSet uses it, which would warn unnecessarily.
baseDataSet = False
if isinstance(self.datasetType, tuple):
baseDataSet = True
if self._strict and baseDataSet:
raise InvalidDataSetIOError("DataSet is an abstract class")
# Populate required metadata
if not self.uuid:
self.newUuid()
self.objMetadata.setdefault("Tags", "")
if not baseDataSet:
dsType = self.objMetadata.setdefault("MetaType", self.datasetType)
else:
dsType = self.objMetadata.setdefault("MetaType",
toDsId('DataSet'))
if not self.objMetadata.get('TimeStampedName'):
self.objMetadata["TimeStampedName"] = getTimeStampedName(
self.objMetadata["MetaType"])
self.objMetadata.setdefault("Name",
self.objMetadata["TimeStampedName"])
# Don't allow for creating datasets from inappropriate sources
# (XML files with mismatched types)
if not baseDataSet:
# use _castableDataSetTypes to contain good casts
if dsType not in self._castableDataSetTypes:
raise IOError(errno.EIO,
"Cannot create {c} from {f}".format(
c=self.datasetType, f=dsType),
files[0])
# Don't allow for creating datasets from inappropriate file types
# (external resources of improper types)
if not baseDataSet:
for fname in self.toExternalFiles():
# due to h5 file types, must be unpythonic:
found = False
for allowed in self._metaTypeMapping().keys():
if fname.endswith(allowed):
found = True
break
if not found:
allowed = self._metaTypeMapping().keys()
extension = fname.split('.')[-1]
raise IOError(errno.EIO,
"Cannot create {c} with resource of type "
"'{t}' ({f}), only {a}".format(c=dsType,
t=extension,
f=fname,
a=allowed))
# State tracking:
self._cachedFilters = []
self.noFiltering = False
self._openReaders = []
self._referenceInfoTable = None
self._referenceDict = {}
self._indexMap = None
self._referenceInfoTableIsStacked = None
self._readGroupTableIsRemapped = False
self._index = None
# only to be used against incorrect counts from the XML, not for
# internal accounting:
self._countsUpdated = False
# update counts
if files:
if not self.totalLength or not self.numRecords:
self.updateCounts()
elif self.totalLength <= 0 or self.numRecords <= 0:
self.updateCounts()
elif len(files) > 1:
self.updateCounts()
# generate indices if requested and needed
if _induceIndices:
self.induceIndices()
def induceIndices(self, force=False):
"""Generate indices for ExternalResources.
Not compatible with DataSet base type"""
raise NotImplementedError()
def __repr__(self):
"""Represent the dataset with an informative string:
Returns:
"<type uuid filenames>"
"""
repr_d = dict(k=self.__class__.__name__, u=self.uuid, f=self.fileNames)
return '<{k} uuid:{u} source files:{f}>'.format(**repr_d)
def __add__(self, otherDataset):
"""Merge the representations of two DataSets without modifying
the original datasets. (Fails if filters are incompatible).
Args:
:otherDataset: a DataSet to merge with self
Returns:
A new DataSet with members containing the union of the input
DataSets' members and subdatasets representing the input DataSets
Doctest:
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet
>>> from pbcore.io.dataset.DataSetWriter import toXml
>>> # xmls with different resourceIds: success
>>> ds1 = AlignmentSet(data.getXml(no=8))
>>> ds2 = AlignmentSet(data.getXml(no=11))
>>> ds3 = ds1 + ds2
>>> expected = ds1.numExternalResources + ds2.numExternalResources
>>> ds3.numExternalResources == expected
True
>>> # xmls with different resourceIds but conflicting filters:
>>> # failure to merge
>>> ds2.filters.addRequirement(rname=[('=', 'E.faecalis.1')])
>>> ds3 = ds1 + ds2
>>> ds3
>>> # xmls with same resourceIds: ignores new inputs
>>> ds1 = AlignmentSet(data.getXml(no=8))
>>> ds2 = AlignmentSet(data.getXml(no=8))
>>> ds3 = ds1 + ds2
>>> expected = ds1.numExternalResources
>>> ds3.numExternalResources == expected
True
"""
return self.merge(otherDataset)
def merge(self, other, copyOnMerge=True, newuuid=True):
"""Merge an 'other' dataset with this dataset, same as add operator,
but can take argumens
"""
if (other.__class__.__name__ == self.__class__.__name__ or
other.__class__.__name__ == 'DataSet' or
self.__class__.__name__ == 'DataSet'):
# determine whether or not this is the merge that is populating a
# dataset for the first time
firstIn = True if len(self.externalResources) == 0 else False
if copyOnMerge:
result = self.copy()
else:
result = self
# Block on filters?
if (not firstIn and
not self.filters.testCompatibility(other.filters)):
log.warning("Filter incompatibility has blocked the merging "
"of two datasets")
return None
elif firstIn:
result.addFilters(other.filters, underConstruction=True)
# reset the filters, just in case
result._cachedFilters = []
# block on object metadata?
result._checkObjMetadata(other.objMetadata)
# If this dataset has no subsets representing it, add self as a
# subdataset to the result
# TODO: firstIn is a stopgap to prevent spurious subdatasets when
# creating datasets from dataset xml files...
if not self.subdatasets and not firstIn:
result.addDatasets(self.copy())
# add subdatasets
if other.subdatasets or not firstIn:
result.addDatasets(other.copy())
# add in the metadata (not to be confused with obj metadata)
if firstIn:
result.metadata = other.metadata
else:
result.addMetadata(other.metadata)
# skip updating counts because other's metadata should be up to
# date
result.addExternalResources(other.externalResources,
updateCount=False)
# DataSets may only be merged if they have identical filters,
# So there is nothing new to add.
# objMetadata:
if firstIn:
result.objMetadata.update(other.objMetadata)
else:
# schemaLocation, MetaType, Version should be the same
# Name:
if (result.objMetadata.get('Name') and
other.objMetadata.get('Name')):
if result.objMetadata['Name'] != other.objMetadata['Name']:
result.objMetadata['Name'] = ' AND '.join(
[result.objMetadata['Name'],
other.objMetadata['Name']])
elif other.objMetadata.get('Name'):
result.objMetadata['Name'] = other.objMetadata['Name']
# Tags:
if (result.objMetadata.get('Tags') and
other.objMetadata.get('Tags')):
if result.objMetadata['Tags'] != other.objMetadata['Tags']:
result.objMetadata['Tags'] = ' '.join(
[result.objMetadata['Tags'],
other.objMetadata['Tags']])
elif other.objMetadata.get('Tags'):
result.objMetadata['Tags'] = other.objMetadata['Tags']
# TimeStampedName:
if result.objMetadata.get("MetaType"):
result.objMetadata["TimeStampedName"] = getTimeStampedName(
result.objMetadata["MetaType"])
# CreatedAt:
result.objMetadata['CreatedAt'] = getCreatedAt()
# UUID:
if newuuid:
result.newUuid()
return result
else:
raise TypeError('DataSets can only be merged with records of the '
'same type or of type DataSet')
def __deepcopy__(self, memo):
"""Deep copy this Dataset by recursively deep copying the members
(objMetadata, DataSet metadata, externalResources, filters and
subdatasets)
"""
tbr = type(self)(skipCounts=True)
memo[id(self)] = tbr
tbr.objMetadata = copy.deepcopy(self.objMetadata, memo)
tbr.metadata = copy.deepcopy(self._metadata, memo)
tbr.externalResources = copy.deepcopy(self.externalResources, memo)
tbr.filters = copy.deepcopy(self._filters, memo)
tbr.subdatasets = copy.deepcopy(self.subdatasets, memo)
tbr.fileNames = copy.deepcopy(self.fileNames, memo)
tbr._skipCounts = False
return tbr
def __eq__(self, other):
"""Test for DataSet equality. The method specified in the documentation
calls for md5 hashing the "Core XML" elements and comparing. This is
the same procedure for generating the Uuid, so the same method may be
used. However, as simultaneously or regularly updating the Uuid is not
specified, we opt to not set the newUuid when checking for equality.
Args:
:other: The other DataSet to compare to this DataSet.
Returns:
T/F the Core XML elements of this and the other DataSet hash to the
same value
"""
sXml = self.newUuid(setter=False)
oXml = other.newUuid(setter=False)
return sXml == oXml
def __enter__(self):
return self
def close(self):
"""Close all of the opened resource readers"""
for reader in self._openReaders:
try:
reader.close()
except AttributeError:
if not self._strict:
log.info("Reader not opened properly, therefore not "
"closed properly.")
else:
raise
del reader
self._openReaders = []
def __exit__(self, *exec_info):
self.close()
def __len__(self):
"""Return the number of records in this DataSet"""
if self.numRecords <= 0:
if self._filters:
if isinstance(self.datasetType, tuple):
log.debug("Base class DataSet length cannot be calculated "
"when filters are present")
else:
self.updateCounts()
else:
try:
# a little cheaper:
count = 0
for reader in self.resourceReaders():
count += len(reader)
self.numRecords = count
except UnavailableFeature:
# UnavailableFeature: no .bai
self.updateCounts()
elif not self._countsUpdated:
# isn't that expensive, avoids crashes due to incorrect numRecords:
self.updateCounts()
return self.numRecords
def newRandomUuid(self):
"""Generate a new random UUID"""
return self.newUuid(setter=True, random=True)
def newUuid(self, setter=True, random=False):
"""Generate and enforce the uniqueness of an ID for a new DataSet.
While user setable fields are stripped out of the Core DataSet object
used for comparison, the previous UniqueId is not. That means that
copies will still be unique, despite having the same contents.
Args:
:setter=True: Setting to False allows MD5 hashes to be generated
(e.g. for comparison with other objects) without
modifying the object's UniqueId
:random=False: If true, the new UUID will be generated randomly. Otherwise a hashing algo will be
used from "core" elements of the XML. This will yield a reproducible UUID for
datasets that have the same "core" attributes/metadata.
Returns:
The new Id, a properly formatted md5 hash of the Core DataSet
Doctest:
>>> from pbcore.io import AlignmentSet
>>> ds = AlignmentSet()
>>> old = ds.uuid
>>> _ = ds.newUuid()
>>> old != ds.uuid
True
"""
if random:
newId = str(uuid.uuid4())
else:
coreXML = toXml(self, core=True)
newId = str(hashlib.md5(coreXML).hexdigest())
# Group appropriately
newId = '-'.join([newId[:8], newId[8:12], newId[12:16],
newId[16:20], newId[20:]])
if setter:
self.objMetadata['UniqueId'] = newId
return newId
def copyTo(self, dest, relative=False, subdatasets=False):
"""Doesn't resolve resource name collisions"""
ofn = None
dest = os.path.abspath(dest)
if not os.path.isdir(dest):
ofn = dest
dest = os.path.split(dest)[0]
# unfortunately file indices must have the same name as the file they
# index, so we carry around some state to store the most recent uuid
# seen. Good thing we do a depth first traversal!
state = [self.uuid]
resFunc = partial(_copier, dest, subfolder=state)
self._modResources(resFunc, subdatasets=subdatasets)
if not ofn is None:
self.write(ofn, relPaths=relative)
def copy(self, asType=None):
"""Deep copy the representation of this DataSet
Args:
:asType: The type of DataSet to return, e.g. 'AlignmentSet'
Returns:
A DataSet object that is identical but for UniqueId
Doctest:
>>> import pbcore.data.datasets as data
>>> from pbcore.io import DataSet, SubreadSet
>>> ds1 = DataSet(data.getXml(12))
>>> # Deep copying datasets is easy:
>>> ds2 = ds1.copy()
>>> # But the resulting uuid's should be different.
>>> ds1 == ds2
False
>>> ds1.uuid == ds2.uuid
False
>>> ds1 is ds2
False
>>> # Most members are identical
>>> ds1.name == ds2.name
True
>>> ds1.externalResources == ds2.externalResources
True
>>> ds1.filters == ds2.filters
True
>>> ds1.subdatasets == ds2.subdatasets
True
>>> len(ds1.subdatasets) == 2
True
>>> len(ds2.subdatasets) == 2
True
>>> # Except for the one that stores the uuid:
>>> ds1.objMetadata == ds2.objMetadata
False
>>> # And of course identical != the same object:
>>> assert not reduce(lambda x, y: x or y,
... [ds1d is ds2d for ds1d in
... ds1.subdatasets for ds2d in
... ds2.subdatasets])
>>> # But types are maintained:
>>> ds1 = SubreadSet(data.getXml(no=10), strict=True)
>>> ds1.metadata # doctest:+ELLIPSIS
<SubreadSetMetadata...
>>> ds2 = ds1.copy()
>>> ds2.metadata # doctest:+ELLIPSIS
<SubreadSetMetadata...
>>> # Lets try casting
>>> ds1 = DataSet(data.getBam())
>>> ds1 # doctest:+ELLIPSIS
<DataSet...
>>> ds1 = ds1.copy(asType='SubreadSet')
>>> ds1 # doctest:+ELLIPSIS
<SubreadSet...
>>> # Lets do some illicit casting
>>> ds1 = ds1.copy(asType='ReferenceSet')
Traceback (most recent call last):
TypeError: Cannot cast from SubreadSet to ReferenceSet
>>> # Lets try not having to cast
>>> ds1 = SubreadSet(data.getBam())
>>> ds1 # doctest:+ELLIPSIS
<SubreadSet...
"""
if asType:
try:
tbr = self.__class__.castableTypes()[asType]()
except KeyError:
raise TypeError("Cannot cast from {s} to "
"{t}".format(s=type(self).__name__,
t=asType))
tbr.merge(self)
tbr.makePathsAbsolute()
return tbr
result = copy.deepcopy(self)
result.newUuid()
return result
def split(self, chunks=0, ignoreSubDatasets=True, contigs=False,
maxChunks=0, breakContigs=False, targetSize=5000, zmws=False,
barcodes=False, byRecords=False, updateCounts=True):
"""Deep copy the DataSet into a number of new DataSets containing
roughly equal chunks of the ExternalResources or subdatasets.
Examples:
- split into exactly n datasets where each addresses a different \
piece of the collection of contigs::
dset.split(contigs=True, chunks=n)
- split into at most n datasets where each addresses a different \
piece of the collection of contigs, but contigs are kept whole::
dset.split(contigs=True, maxChunks=n)
- split into at most n datasets where each addresses a different \
piece of the collection of contigs and the number of chunks is \
in part based on the number of reads::
dset.split(contigs=True, maxChunks=n, breakContigs=True)
Args:
:chunks: the number of chunks to split the DataSet.
:ignoreSubDatasets: (True) do not split by subdatasets
:contigs: split on contigs instead of external resources
:zmws: Split by zmws instead of external resources
:barcodes: Split by barcodes instead of external resources
:maxChunks: The upper limit on the number of chunks.
:breakContigs: Whether or not to break contigs
:byRecords: Split contigs by mapped records, rather than ref length
:targetSize: The target minimum number of reads per chunk
:updateCounts: Update the count metadata in each chunk
Returns:
A list of new DataSet objects (all other information deep copied).
Doctest:
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet
>>> # splitting is pretty intuitive:
>>> ds1 = AlignmentSet(data.getXml(12))
>>> # but divides up extRes's, so have more than one:
>>> ds1.numExternalResources > 1
True
>>> # the default is one AlignmentSet per ExtRes:
>>> dss = ds1.split()
>>> len(dss) == ds1.numExternalResources
True
>>> # but you can specify a number of AlignmentSets to produce:
>>> dss = ds1.split(chunks=1)
>>> len(dss) == 1
True
>>> dss = ds1.split(chunks=2, ignoreSubDatasets=True)
>>> len(dss) == 2
True
>>> # The resulting objects are similar:
>>> dss[0].uuid == dss[1].uuid
False
>>> dss[0].name == dss[1].name
True
>>> # Previously merged datasets are 'unmerged' upon split, unless
>>> # otherwise specified.
>>> # Lets try merging and splitting on subdatasets:
>>> ds1 = AlignmentSet(data.getXml(8))
>>> ds1.totalLength
123588
>>> ds1tl = ds1.totalLength
>>> ds2 = AlignmentSet(data.getXml(11))
>>> ds2.totalLength
117086
>>> ds2tl = ds2.totalLength
>>> # merge:
>>> dss = ds1 + ds2
>>> dss.totalLength == (ds1tl + ds2tl)
True
>>> # unmerge:
>>> ds1, ds2 = sorted(
... dss.split(2, ignoreSubDatasets=False),
... key=lambda x: x.totalLength, reverse=True)
>>> ds1.totalLength == ds1tl
True
>>> ds2.totalLength == ds2tl
True
"""
# File must have pbi index to be splittable:
if len(self) == 0:
return [self.copy()]
if contigs:
return self._split_contigs(chunks, maxChunks, breakContigs,
targetSize=targetSize,
byRecords=byRecords,
updateCounts=updateCounts)
elif zmws:
if chunks == 0:
if maxChunks:
chunks = maxChunks
elif targetSize:
chunks = max(1,
int(round(np.true_divide(
len(set(self.index.holeNumber)),
targetSize))))
return self._split_zmws(chunks, targetSize=targetSize)
elif barcodes:
if maxChunks and not chunks:
chunks = maxChunks
return self._split_barcodes(chunks)
# Lets only split on datasets if actual splitting will occur,
# And if all subdatasets have the required balancing key (totalLength)
if (len(self.subdatasets) > 1
and not ignoreSubDatasets):
return self._split_subdatasets(chunks)
atoms = self.externalResources.resources
balanceKey = len
# If chunks not specified, split to one DataSet per
# ExternalResource, but no fewer than one ExternalResource per
# Dataset
if not chunks or chunks > len(atoms):
chunks = len(atoms)
# Nothing to split!
if chunks == 1:
tbr = [self.copy()]
return tbr
# duplicate
results = [self.copy() for _ in range(chunks)]
# replace default (complete) ExternalResource lists
log.debug("Starting chunking")
chunks = self._chunkList(atoms, chunks, balanceKey)
log.debug("Done chunking")
log.debug("Modifying filters or resources")
for result, chunk in zip(results, chunks):
result.externalResources.resources = copy.deepcopy(chunk)
# UniqueId was regenerated when the ExternalResource list was
# whole, therefore we need to regenerate it again here
log.debug("Generating new UUID")
for result in results:
if updateCounts:
result.updateCounts()
result.newUuid()
# Update the basic metadata for the new DataSets from external
# resources, or at least mark as dirty
# TODO
return results
def _split_contigs(self, chunks, maxChunks=0, breakContigs=False,
targetSize=5000, byRecords=False, updateCounts=False):
raise TypeError("Only AlignmentSets may be split by contigs")
def _split_barcodes(self, chunks):
raise TypeError("Only ReadSets may be split by contigs")
def _split_zmws(self, chunks, targetSize=None):
raise TypeError("Only ReadSets may be split by ZMWs")
def _split_atoms(self, atoms, num_chunks):
"""Divide up atomic units (e.g. contigs) into chunks (refId, size,
segments)
"""
for _ in range(num_chunks - len(atoms)):
largest = max(atoms, key=lambda x: x[1]//x[2])
largest[2] += 1
return atoms
def _split_subdatasets(self, chunks):
"""Split on subdatasets
Args:
chunks: Split in to <= chunks pieces. If chunks > subdatasets,
fewer chunks will be produced.
"""
if not chunks or chunks > len(self.subdatasets):
chunks = len(self.subdatasets)
if (chunks == len(self.subdatasets)
and all([sds.externalResources.resourceIds
for sds in self.subdatasets])):
return self.subdatasets
# Chunk subdatasets into sets of subdatasets of roughly equal
# lengths
chunks = self._chunkList(
self.subdatasets, chunks,
balanceKey=lambda x: x.metadata.numRecords)
# Merge the lists of datasets into single datasets to return
results = []
for subDatasets in chunks:
newCopy = self.copy()
newCopy.subdatasets = subDatasets
newCopy.newUuid()
results.append(newCopy)
return results
def write(self, outFile, validate=True, modPaths=None,
relPaths=None, pretty=True):
"""Write to disk as an XML file
Args:
:outFile: The filename of the xml file to be created
:validate: T/F (True) validate the ExternalResource ResourceIds
:relPaths: T/F (None/no change) make the ExternalResource
ResourceIds relative instead of absolute filenames
:modPaths: DEPRECATED (T/F) allow paths to be modified
Doctest:
>>> import pbcore.data.datasets as data
>>> from pbcore.io import DataSet
>>> import tempfile, os
>>> outdir = tempfile.mkdtemp(suffix="dataset-doctest")
>>> outfile = os.path.join(outdir, 'tempfile.xml')
>>> ds1 = DataSet(data.getXml(), skipMissing=True)
>>> ds1.write(outfile, validate=False)
>>> ds2 = DataSet(outfile, skipMissing=True)
>>> ds1 == ds2
True
"""
log.debug("Writing DataSet...")
if not modPaths is None:
log.info("modPaths as a write argument is deprecated. Paths "
"aren't modified unless relPaths is explicitly set "
"to True or False. Will be removed in future versions.")
# make sure we keep the same effect for now, in case someone has
# something odd like modPaths=False, relPaths=True
if not modPaths:
relPaths = None
# fix paths if validate:
if validate and not relPaths is None:
if relPaths and not isinstance(outFile, basestring):
raise InvalidDataSetIOError("Cannot write relative "
"pathnames without a filename")
elif relPaths:
self.makePathsRelative(os.path.dirname(outFile))
else:
self.makePathsAbsolute()
log.debug('Serializing XML...')
xml_string = toXml(self)
log.debug('Done serializing XML')
if pretty:
log.debug('Making pretty...')
xml_string = xml.dom.minidom.parseString(xml_string).toprettyxml(
encoding="UTF-8")
log.debug('Done making pretty...')
# not useful yet as a list, but nice to keep the options open:
validation_errors = []
if validate:
log.debug('Validating...')
try:
if isinstance(outFile, basestring):
validateString(xml_string, relTo=os.path.dirname(outFile))
else:
validateString(xml_string)
except Exception as e:
validation_errors.append(e)
log.debug('Done validating')
if isinstance(outFile, basestring):
fileName = urlparse(outFile).path.strip()
if self._strict and not isinstance(self.datasetType, tuple):
if not fileName.endswith(dsIdToSuffix(self.datasetType)):
raise IOError(errno.EIO,
"Given filename does not meet standards, "
"should end with {s}".format(
s=dsIdToSuffix(self.datasetType)),
fileName)
outFile = open(fileName, 'w')
log.debug('Writing...')
outFile.write(xml_string)
outFile.flush()
log.debug('Done writing')
for e in validation_errors:
log.error("Invalid file produced: {f}".format(f=fileName))
raise e
log.debug("Done writing DataSet")
def loadStats(self, filename=None):
"""Load pipeline statistics from a <moviename>.sts.xml file. The subset
of these data that are defined in the DataSet XSD become available
through via DataSet.metadata.summaryStats.<...> and will be written out
to the DataSet XML format according to the DataSet XML XSD.
Args:
:filename: the filename of a <moviename>.sts.xml file. If None:
load all stats from sts.xml files, including for
subdatasets.
Doctest:
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet
>>> ds1 = AlignmentSet(data.getXml(8))
>>> ds1.loadStats(data.getStats())
>>> ds2 = AlignmentSet(data.getXml(11))
>>> ds2.loadStats(data.getStats())
>>> ds3 = ds1 + ds2
>>> ds1.metadata.summaryStats.prodDist.bins
[1576, 901, 399, 0]
>>> ds2.metadata.summaryStats.prodDist.bins
[1576, 901, 399, 0]
>>> ds3.metadata.summaryStats.prodDist.bins
[3152, 1802, 798, 0]
"""
if filename is None:
checksts(self, subdatasets=True)
else:
if isinstance(filename, basestring):
statsMetadata = parseStats(str(filename))
else:
statsMetadata = filename
if self.metadata.summaryStats:
self.metadata.summaryStats.merge(statsMetadata)
else:
self.metadata.append(statsMetadata)
def readsByName(self, query):
reads = _flatten([rr.readsByName(query)
for rr in self.resourceReaders()])
return sorted(reads, key=lambda a: a.readStart)
def loadMetadata(self, filename):
"""Load pipeline metadata from a <moviename>.metadata.xml file (or
other DataSet)
Args:
:filename: the filename of a <moviename>.metadata.xml file
"""
if isinstance(filename, basestring):
if isDataSet(filename):
# path to DataSet
metadata = openDataSet(filename).metadata
else:
# path to metadata.xml
metadata = parseMetadata(str(filename))
else:
# DataSetMetadata object
metadata = filename
self.addMetadata(metadata)
self.updateCounts()
def processFilters(self):
"""Generate a list of functions to apply to a read, all of which return
T/F. Each function is an OR filter, so any() true passes the read.
These functions are the AND filters, and will likely check all() of
other functions. These filtration functions are cached so that they are
not regenerated from the base filters for every read"""
# Allows us to not process all of the filters each time. This is marked
# as dirty (= []) by addFilters etc. Filtration can be turned off by
# setting this to [lambda x: True], which can be reversed by marking
# the cache dirty. See disableFilters/enableFilters
if self._cachedFilters:
return self._cachedFilters
filters = self.filters.tests(readType=self._filterType())
# Having no filters means no opportunity to pass. Fix by filling with
# always-true (similar to disableFilters())
if not filters:
self._cachedFilters = [lambda x: True]
return self._cachedFilters
self._cachedFilters = filters
return filters
def _filterType(self):
"""A key that maps to a set of filtration keywords specific to this
DataSet's ExternalResource type"""
raise NotImplementedError()
def makePathsAbsolute(self, curStart="."):
"""As part of the validation process, make all ResourceIds absolute
URIs rather than relative paths. Generally not called by API users.
Args:
:curStart: The location from which relative paths should emanate.
"""
log.debug("Making paths absolute...")
self._changePaths(
lambda x, s=curStart: checkAndResolve(x, s))
log.debug("Done making paths absolute")
def makePathsRelative(self, outDir=False):
"""Make things easier for writing test cases: make all
ResourceIds relative paths rather than absolute paths.
A less common use case for API consumers.
Args:
:outDir: The location from which relative paths should originate
"""
log.debug("Making paths relative")
if outDir:
self._changePaths(lambda x, s=outDir: os.path.relpath(x, s))
else:
self._changePaths(os.path.relpath)
def _modResources(self, func, subdatasets=True):
"""Execute some function 'func' on each external resource in the
dataset and each subdataset"""
# check all ExternalResources
stack = list(self.externalResources)
while stack:
item = stack.pop()
resId = item.resourceId
if not resId:
continue
func(item)
try:
stack.extend(list(item.indices))
except AttributeError:
# some members have no indices
pass
try:
stack.extend(list(item.externalResources))
except AttributeError:
# some members have no externalResources
pass
if subdatasets:
# check all SubDatasets
for dataset in self.subdatasets:
dataset._modResources(func)
def _changePaths(self, osPathFunc, checkMetaType=True):
"""Change all resourceId paths in this DataSet according to the
osPathFunc provided.
Args:
:osPathFunc: A function for modifying paths (e.g. os.path.abspath)
:checkMetaType: Update the metatype of externalResources if needed
"""
metaTypeFunc = self._updateMetaType if checkMetaType else lambda x: x
resFunc = partial(_pathChanger, osPathFunc, metaTypeFunc)
self._modResources(resFunc)
def _populateMetaTypes(self):
"""Add metatypes to those ExternalResources that currently are
without"""
log.debug("Updating metatypes...")
self._modResources(self._updateMetaType)
log.debug("Done updating metatypes")
def _updateMetaType(self, resource):
"""Infer and set the metatype of 'resource' if it doesn't already have
one."""
if not resource.metaType:
file_type = fileType(resource.resourceId)
resource.metaType = self._metaTypeMapping().get(file_type, "")
if not resource.timeStampedName:
mtype = resource.metaType
tsName = getTimeStampedName(mtype)
resource.timeStampedName = tsName
@staticmethod
def _metaTypeMapping():
"""The available mappings between file extension and MetaType (informed
by current class)."""
# no metatypes for generic DataSet
return {}
def copyFiles(self, outdir):
"""Copy all of the top level ExternalResources to an output
directory 'outdir'"""
backticks('cp {i} {o}'.format(i=' '.join(self.toExternalFiles()),
o=outdir))
def disableFilters(self):
"""Disable read filtering for this object"""
self.reFilter()
self.noFiltering = True
# a dummy filter:
self._cachedFilters = [lambda x: True]
def enableFilters(self):
"""Re-enable read filtering for this object"""
self.reFilter()
self.noFiltering = False
def addFilters(self, newFilters, underConstruction=False):
"""Add new or extend the current list of filters. Public because there
is already a reasonably compelling reason (the console script entry
point). Most often used by the __add__ method.
Args:
:newFilters: a Filters object or properly formatted Filters record
Doctest:
>>> from __future__ import print_function
>>> import pbcore.data.datasets as data
>>> from pbcore.io import SubreadSet
>>> from pbcore.io.dataset.DataSetMembers import Filters
>>> ds1 = SubreadSet()
>>> filt = Filters()
>>> filt.addRequirement(rq=[('>', '0.85')])
>>> ds1.addFilters(filt)
>>> print(ds1.filters)
( rq > 0.85 )
>>> # Or load with a DataSet
>>> ds2 = DataSet(data.getXml(16))
>>> print(ds2.filters)
... # doctest:+ELLIPSIS
( rname = E.faecalis...
"""
self.filters.merge(copy.deepcopy(newFilters))
self.reFilter(light=underConstruction)
def _checkObjMetadata(self, newMetadata):
"""Check new object metadata (as opposed to dataset metadata) against
the object metadata currently in this DataSet for compatibility.
Args:
:newMetadata: The object metadata of a DataSet being considered for
merging
"""
# If there is no objMetadata, this is a new dataset being populated
if self.objMetadata:
# if there isn't a Version in each, that will fail eventually
if 'Version' in self.objMetadata and 'Version' in newMetadata:
if newMetadata['Version'] == self.objMetadata['Version']:
return True
# We'll make an exception for now: major version number passes
elif (newMetadata['Version'].split('.')[0] ==
self.objMetadata['Version'].split('.')[0]):
log.warn("Future warning: merging datasets that don't "
"share a version number will fail.")
return True
raise ValueError("Wrong dataset version for merging {v1} vs "
"{v2}".format(
v1=newMetadata.get('Version'),
v2=self.objMetadata.get('Version')))
log.warn("Future warning: merging will require Version "
"numbers for both DataSets")
return True
def addMetadata(self, newMetadata, **kwargs):
"""Add dataset metadata.
Currently we ignore duplicates while merging (though perhaps other
transformations are more appropriate) and plan to remove/merge
conflicting metadata with identical attribute names.
All metadata elements should be strings, deepcopy shouldn't be
necessary.
This method is most often used by the __add__ method, rather than
directly.
Args:
:newMetadata: a dictionary of object metadata from an XML file (or
carefully crafted to resemble one), or a wrapper
around said dictionary
:kwargs: new metadata fields to be piled into the current metadata
(as an attribute)
Doctest:
>>> from __future__ import print_function
>>> import pbcore.data.datasets as data
>>> from pbcore.io import DataSet
>>> ds = DataSet()
>>> # it is possible to add new metadata:
>>> ds.addMetadata(None, Name='LongReadsRock')
>>> print(ds._metadata.getV(container='attrib', tag='Name'))
LongReadsRock
>>> # but most will be loaded and modified:
>>> ds2 = DataSet(data.getXml(no=8))
>>> ds2._metadata.totalLength
123588
>>> ds2._metadata.totalLength = 100000
>>> ds2._metadata.totalLength
100000
>>> ds2._metadata.totalLength += 100000
>>> ds2._metadata.totalLength
200000
>>> ds3 = DataSet(data.getXml(no=8))
>>> ds3.loadStats(data.getStats())
>>> ds4 = DataSet(data.getXml(no=11))
>>> ds4.loadStats(data.getStats())
>>> ds5 = ds3 + ds4
"""
if newMetadata:
# if this record is not wrapped, wrap for easier merging
if not isinstance(newMetadata, DataSetMetadata):
newMetadata = DataSetMetadata(newMetadata)
# merge
if self.metadata:
self.metadata.merge(newMetadata)
# or initialize
else:
self.metadata = newMetadata
for key, value in kwargs.items():
self.metadata.addMetadata(key, value)
def updateCounts(self):
"""Update the TotalLength and NumRecords for this DataSet.
Not compatible with the base DataSet class, which has no ability to
touch ExternalResources. -1 is used as a sentinel value for failed size
determination. It should never be written out to XML in regular use.
"""
self.metadata.totalLength = -1
self.metadata.numRecords = -1
def addExternalResources(self, newExtResources, updateCount=True):
"""Add additional ExternalResource objects, ensuring no duplicate
resourceIds. Most often used by the __add__ method, rather than
directly.
Args:
:newExtResources: A list of new ExternalResource objects, either
created de novo from a raw bam input, parsed from
an xml input, or already contained in a separate
DataSet object and being merged.
Doctest:
>>> from pbcore.io.dataset.DataSetMembers import ExternalResource
>>> from pbcore.io import DataSet
>>> ds = DataSet()
>>> # it is possible to add ExtRes's as ExternalResource objects:
>>> er1 = ExternalResource()
>>> er1.resourceId = "test1.bam"
>>> er2 = ExternalResource()
>>> er2.resourceId = "test2.bam"
>>> er3 = ExternalResource()
>>> er3.resourceId = "test1.bam"
>>> ds.addExternalResources([er1], updateCount=False)
>>> len(ds.externalResources)
1
>>> # different resourceId: succeeds
>>> ds.addExternalResources([er2], updateCount=False)
>>> len(ds.externalResources)
2
>>> # same resourceId: fails
>>> ds.addExternalResources([er3], updateCount=False)
>>> len(ds.externalResources)
2
>>> # but it is probably better to add them a little deeper:
>>> ds.externalResources.addResources(
... ["test3.bam"])[0].addIndices(["test3.bam.bai"])
"""
if not isinstance(newExtResources, ExternalResources):
tmp = ExternalResources()
# have to wrap them here, as wrapNewResource does quite a bit and
# importing into members would create a circular inport
tmp.addResources([wrapNewResource(res)
if not isinstance(res, ExternalResource) else res
for res in newExtResources])
newExtResources = tmp
self.externalResources.merge(newExtResources)
if updateCount:
self._openFiles()
self.updateCounts()
def addDatasets(self, otherDataSet):
"""Add subsets to a DataSet object using other DataSets.
The following method of enabling merge-based split prevents nesting of
datasets more than one deep. Nested relationships are flattened.
.. note::
Most often used by the __add__ method, rather than directly.
"""
if otherDataSet.subdatasets:
self.subdatasets.extend(otherDataSet.subdatasets)
else:
self.subdatasets.append(otherDataSet)
def _openFiles(self):
"""Open the top level ExternalResources"""
raise RuntimeError("Not defined for this type of DataSet")
def resourceReaders(self):
"""Return a list of open pbcore Reader objects for the
top level ExternalResources in this DataSet"""
raise RuntimeError("Not defined for this type of DataSet")
@property
@filtered
def records(self):
"""A generator of (usually) BamAlignment objects for the
records in one or more Bam files pointed to by the
ExternalResources in this DataSet.
Yields:
A BamAlignment object
Doctest:
>>> from __future__ import print_function
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet
>>> ds = AlignmentSet(data.getBam())
>>> for record in ds.records:
... print('hn: %i' % record.holeNumber) # doctest:+ELLIPSIS
hn: ...
"""
for resource in self.resourceReaders():
for record in resource:
yield record
def __iter__(self):
"""Iterate over the records.
The order of yielded reads is determined by the order of the
ExternalResources and record order within each file"""
if self.isIndexed:
# this uses the index to respect filters
for i in xrange(len(self)):
yield self[i]
else:
# this uses post-filtering to respect filters
for record in self.records:
yield record
@property
def subSetNames(self):
"""The subdataset names present in this DataSet"""
subNames = []
for sds in self.subdatasets:
subNames.extend(sds.name)
return subNames
def readsInSubDatasets(self, subNames=None):
"""To be used in conjunction with self.subSetNames"""
if subNames is None:
subNames = []
if self.subdatasets:
for sds in self.subdatasets:
if subNames and sds.name not in subNames:
continue
# subdatasets often don't have the same resourcess
if not sds.externalResources.resourceIds:
sds.externalResources = self.externalResources
# this should enforce the subdataset filters:
for read in sds.records:
yield read
else:
for read in self.records:
yield read
# FIXME this is a workaround for the lack of support for ZMW chunking in
# pbbam, and should probably go away once that is available.
@property
def zmwRanges(self):
"""
Return the end-inclusive range of ZMWs covered by the dataset if this
was explicitly set by filters via DataSet.split(zmws=True).
"""
ranges = []
for filt in self._filters:
movies = []
starts = []
ends = []
# it is possible, e.g. if splitting a previously split dataset, to
# get filters with overlapping ranges, one of which is more
# restrictive than the other. We need to collapse these.
for param in filt:
if param.name == "movie":
movies.append(param.value)
elif param.name == "zm":
ival = int(param.value)
if param.operator == '>':
ival += 1
starts.append(ival)
elif param.operator == '<':
ival -= 1
ends.append(ival)
# this is a single filter, all parameters are AND'd. Therefore
# multiple movies cannot be satisfied. Ignore in those cases. Where
# we have multiple params for the same movie, we assume that one
# range is a subset of the other, and pick the most restrictive.
# This depends on dataset.split respecting the existing filters.
if len(set(movies)) == 1:
ranges.append((movies[0], max(starts), min(ends)))
ranges = list(set(ranges))
return ranges
# FIXME this is a workaround for the lack of support for barcode chunking
# in pbbam, and should probably go away once that is available.
@property
def barcodes(self):
"""Return the list of barcodes explicitly set by filters via
DataSet.split(barcodes=True).
"""
barcodes = []
for filt in self._filters:
for param in filt:
if param.name == "bc":
barcodes.append(param.value)
return barcodes
def toFofn(self, outfn=None, uri=False, relative=False):
"""Return a list of resource filenames (and write to optional outfile)
Args:
:outfn: (None) the file to which the resouce filenames are to be
written. If None, the only emission is a returned list of
file names.
:uri: (t/F) write the resource filenames as URIs.
:relative: (t/F) emit paths relative to outfofn or '.' if no
outfofn
Returns:
A list of filenames or uris
Writes:
(Optional) A file containing a list of filenames or uris
Doctest:
>>> from pbcore.io import DataSet
>>> DataSet("bam1.bam", "bam2.bam", strict=False,
... skipMissing=True).toFofn(uri=False)
['bam1.bam', 'bam2.bam']
"""
lines = [er.resourceId for er in self.externalResources]
if not uri:
lines = [urlparse(line).path for line in lines]
if relative is True:
# make it relative to the fofn location
if outfn:
lines = [os.path.relpath(line, os.path.dirname(outfn))
for line in lines]
# or current location
else:
lines = [os.path.relpath(line) for line in lines]
if outfn:
with open(outfn, 'w') as outFile:
outFile.writelines(line + '\n' for line in lines)
return lines
def toExternalFiles(self):
"""Returns a list of top level external resources (no indices)."""
return self.externalResources.resourceIds
@property
def _castableDataSetTypes(self):
"""Tuple of DataSet types to which this DataSet can be cast"""
if isinstance(self.datasetType, tuple):
return self.datasetType
else:
return (toDsId('DataSet'), self.datasetType)
@classmethod
def castableTypes(cls):
"""The types to which this DataSet type may be cast. This is a property
instead of a member variable as we can enforce casting limits here (and
modify if needed by overriding them in subclasses).
Returns:
A dictionary of MetaType->Class mappings, e.g. 'DataSet': DataSet
"""
if cls.__name__ != 'DataSet':
return {'DataSet': DataSet,
cls.__name__: cls}
return {'DataSet': DataSet,
'SubreadSet': SubreadSet,
'HdfSubreadSet': HdfSubreadSet,
'AlignmentSet': AlignmentSet,
'ContigSet': ContigSet,
'ConsensusReadSet': ConsensusReadSet,
'ConsensusAlignmentSet': ConsensusAlignmentSet,
'ReferenceSet': ReferenceSet,
'GmapReferenceSet': GmapReferenceSet,
'BarcodeSet': BarcodeSet,
'TranscriptSet': TranscriptSet,
'TranscriptAlignmentSet': TranscriptAlignmentSet}
@property
def metadata(self):
"""Return the DataSet metadata as a DataSetMetadata object. Attributes
should be populated intuitively, but see DataSetMetadata documentation
for more detail."""
return self._metadata
@metadata.setter
def metadata(self, newDict):
"""Set the metadata for this object. This is reasonably dangerous, as
the argument must be a properly formated data structure, as specified
in the DataSetMetadata documentation. This setter is primarily used
by other DataSet objects, rather than users or API consumers."""
if isinstance(newDict, DataSetMetadata):
self._metadata = newDict
else:
self._metadata = self._metadata.__class__(newDict)
@property
def filters(self):
"""Limit setting to ensure cache hygiene and filter compatibility"""
self._filters.registerCallback(self._wipeCaches)
self._filters.registerCallback(self.newUuid)
return self._filters
@filters.setter
def filters(self, value):
"""Limit setting to ensure cache hygiene and filter compatibility"""
if value is None:
self._filters = Filters()
else:
value.clearCallbacks()
self._filters = value
self.reFilter()
def reFilter(self, light=True):
"""
The filters on this dataset have changed, update DataSet state as
needed
"""
self._cachedFilters = []
self._index = None
self._indexMap = None
if not light:
self.metadata.totalLength = -1
self.metadata.numRecords = -1
if self.metadata.summaryStats:
self.metadata.removeChildren('SummaryStats')
self.updateCounts()
def _wipeCaches(self):
self.reFilter(False)
@property
def createdAt(self):
"""Return the DataSet CreatedAt timestamp"""
return self.objMetadata.get('CreatedAt')
@property
def numRecords(self):
"""The number of records in this DataSet (from the metadata)"""
return self._metadata.numRecords
@numRecords.setter
def numRecords(self, value):
"""The number of records in this DataSet (from the metadata)"""
self._metadata.numRecords = value
@property
def totalLength(self):
"""The total length of this DataSet"""
return self._metadata.totalLength
@totalLength.setter
def totalLength(self, value):
"""The total length of this DataSet"""
self._metadata.totalLength = value
@property
def uuid(self):
"""The UniqueId of this DataSet"""
return self.objMetadata.get('UniqueId')
@uuid.setter
def uuid(self, value):
"""Set the UniqueId of this DataSet"""
self.objMetadata['UniqueId'] = value
@property
def uniqueId(self):
"""The UniqueId of this DataSet"""
return self.uuid
@uniqueId.setter
def uniqueId(self, value):
"""The UniqueId of this DataSet"""
self.uuid = value
@property
def timeStampedName(self):
"""The timeStampedName of this DataSet"""
return self.objMetadata.get('TimeStampedName')
@timeStampedName.setter
def timeStampedName(self, value):
"""The timeStampedName of this DataSet"""
self.objMetadata['TimeStampedName'] = value
@property
def name(self):
"""The name of this DataSet"""
return self.objMetadata.get('Name', '')
@name.setter
def name(self, value):
"""The name of this DataSet"""
self.objMetadata['Name'] = value
@property
def tags(self):
"""The tags of this DataSet"""
return self.objMetadata.get('Tags', '')
@tags.setter
def tags(self, value):
"""The tags of this DataSet"""
self.objMetadata['Tags'] = value
@property
def description(self):
"""The description of this DataSet"""
return self.objMetadata.get('Description', '')
@description.setter
def description(self, value):
"""The description of this DataSet"""
self.objMetadata['Description'] = value
@property
def numExternalResources(self):
"""The number of ExternalResources in this DataSet"""
return len(self.externalResources)
def _pollResources(self, func):
"""Collect the responses to func on each resource (or those with reads
mapping to refName)."""
return [func(resource) for resource in self.resourceReaders()]
def _unifyResponses(self, responses, keyFunc=lambda x: x,
eqFunc=lambda x, y: x == y):
"""Make sure all of the responses from resources are the same."""
if len(responses) > 1:
# Check the rest against the first:
for res in responses[1:]:
if not eqFunc(keyFunc(responses[0]), keyFunc(res)):
raise ResourceMismatchError(responses)
return responses[0]
def hasBaseFeature(self, featureName):
responses = self._pollResources(
lambda x: x.hasBaseFeature(featureName))
return self._unifyResponses(responses)
def baseFeaturesAvailable(self):
responses = self._pollResources(lambda x: x.baseFeaturesAvailable())
return self._unifyResponses(responses)
def hasPulseFeature(self, featureName):
responses = self._pollResources(
lambda x: x.hasPulseFeature(featureName))
return self._unifyResponses(responses)
def pulseFeaturesAvailable(self):
responses = self._pollResources(lambda x: x.pulseFeaturesAvailable())
return self._unifyResponses(responses)
@property
def sequencingChemistry(self):
responses = self._pollResources(lambda x: x.sequencingChemistry)
return list(_flatten(responses))
@property
def isEmpty(self):
responses = self._pollResources(lambda x: getattr(x, 'isEmpty'))
return all(responses)
@property
def readType(self):
return self._checkIdentical('readType')
def _checkIdentical(self, key):
responses = self._pollResources(lambda x: getattr(x, key))
return self._unifyResponses(responses)
def _chunkList(self, inlist, chunknum, balanceKey=len):
"""Divide <inlist> members into <chunknum> chunks roughly evenly using
a round-robin binning system, return list of lists.
This is a method so that balanceKey functions can access self."""
chunks = [[] for _ in range(chunknum)]
# a lightweight accounting of how big (entry 0) each sublist (numbered
# by entry 1) is getting
chunkSizes = [[0, i] for i in range(chunknum)]
for i, item in enumerate(sorted(inlist, key=balanceKey, reverse=True)):
# Refresh measure of bin fullness
chunkSizes.sort()
# Add one to the emptiest bin
chunks[chunkSizes[0][1]].append(item)
mass = balanceKey(item)
if mass == 0:
mass += 1
chunkSizes[0][0] += mass
return chunks
@property
def index(self):
if self._index is None:
log.debug("Populating index")
self.assertIndexed()
self._index = self._indexRecords()
log.debug("Done populating index")
return self._index
def _indexRecords(self):
raise NotImplementedError()
def isIndexed(self):
raise NotImplementedError()
def assertIndexed(self):
raise NotImplementedError()
def _assertIndexed(self, acceptableTypes):
if not self._openReaders:
self._openFiles()
for fname, reader in zip(self.toExternalFiles(),
self.resourceReaders()):
if not isinstance(reader, acceptableTypes):
raise IOError(errno.EIO, "File not indexed", fname)
return True
def __getitem__(self, index):
"""Should respect filters for free, as _indexMap should only be
populated by filtered reads. Only pbi filters considered, however."""
if self._indexMap is None:
_ = self.index
if isinstance(index, int):
# support negatives
if index < 0:
index = len(self.index) + index
rrNo, recNo = self._indexMap[index]
return self.resourceReaders()[rrNo][recNo]
elif isinstance(index, slice):
indexTuples = self._indexMap[index]
return [self.resourceReaders()[ind[0]][ind[1]] for ind in
indexTuples]
elif isinstance(index, list):
indexTuples = [self._indexMap[ind] for ind in index]
return [self.resourceReaders()[ind[0]][ind[1]] for ind in
indexTuples]
elif isinstance(index, np.ndarray):
indexTuples = self._indexMap[index]
return [self.resourceReaders()[ind[0]][ind[1]] for ind in
indexTuples]
elif isinstance(index, basestring):
if 'id' in self.index.dtype.names:
row = np.nonzero(self.index.id == index)[0][0]
return self[row]
else:
raise NotImplementedError()
class ReadSet(DataSet):
"""Base type for read sets, should probably never be used as a concrete
class"""
def __init__(self, *files, **kwargs):
super(ReadSet, self).__init__(*files, **kwargs)
self._metadata = SubreadSetMetadata(self._metadata)
def induceIndices(self, force=False):
for res in self.externalResources:
fname = res.resourceId
newInds = []
if not res.pbi or force:
iname = fname + '.pbi'
if not os.path.isfile(iname) or force:
iname = _pbindexBam(fname)
newInds.append(iname)
self.close()
if not res.bai or force:
iname = fname + '.bai'
if not os.path.isfile(iname) or force:
iname = _indexBam(fname)
newInds.append(iname)
self.close()
if newInds:
res.addIndices(newInds)
self._populateMetaTypes()
self.updateCounts()
return newInds
@property
def isMapped(self):
responses = self._pollResources(lambda x: x.isMapped)
return self._unifyResponses(responses)
@property
def isIndexed(self):
if self.hasPbi:
return True
return False
@property
def isBarcoded(self):
"""Determine whether all resources are barcoded files"""
res = self._pollResources(
lambda x: bool(x.index.pbiFlags & PBI_FLAGS_BARCODE))
# ignore empty bam files, which don't have barcoding information:
if not self.isEmpty:
res = [r for r, reader in zip(res, self.resourceReaders()) if
len(reader)]
return self._unifyResponses(res)
def assertBarcoded(self):
"""Test whether all resources are barcoded files"""
if not self.isBarcoded:
raise RuntimeError("File not barcoded")
def _openFiles(self):
"""Open the files (assert they exist, assert they are of the proper
type before accessing any file)
"""
if self._openReaders:
log.debug("Closing old readers...")
self.close()
log.debug("Opening ReadSet resources")
sharedRefs = {}
infotables = []
infodicts = []
for extRes in self.externalResources:
refFile = extRes.reference
if refFile:
if not refFile in sharedRefs:
log.debug("Using reference: {r}".format(r=refFile))
try:
sharedRefs[refFile] = IndexedFastaReader(refFile)
except IOError:
if not self._strict:
log.warn("Problem opening reference with"
"IndexedFastaReader")
sharedRefs[refFile] = None
else:
raise
location = urlparse(extRes.resourceId).path
resource = None
try:
if extRes.resourceId.endswith('bam'):
resource = IndexedBamReader(location)
if refFile:
resource.referenceFasta = sharedRefs[refFile]
else:
resource = CmpH5Reader(location)
except (IOError, ValueError):
if not self._strict and not extRes.pbi:
log.warn("pbi file missing for {f}, operating with "
"reduced speed and functionality".format(
f=location))
resource = BamReader(location)
if refFile:
resource.referenceFasta = sharedRefs[refFile]
else:
raise
# Consolidate referenceDicts
# This gets huge when there are ~90k references. If you have ~28
# chunks, each with 28 BamReaders, each with 100MB referenceDicts,
# you end up storing tens of gigs of just these (often identical)
# dicts
if not len(infotables):
infotables.append(resource._referenceInfoTable)
infodicts.append(resource._referenceDict)
else:
for ri, rd in zip(infotables, infodicts):
if np.array_equal(resource._referenceInfoTable, ri):
del resource._referenceInfoTable
del resource._referenceDict
resource._referenceInfoTable = ri
resource._referenceDict = rd
break
infotables.append(resource._referenceInfoTable)
infodicts.append(resource._referenceDict)
self._openReaders.append(resource)
try:
if resource.isEmpty:
log.debug("{f} contains no reads!".format(
f=extRes.resourceId))
except UnavailableFeature: # isEmpty requires bai
if not list(itertools.islice(resource, 1)):
log.debug("{f} contains no reads!".format(
f=extRes.resourceId))
if len(self._openReaders) == 0 and len(self.toExternalFiles()) != 0:
raise IOError("No files were openable")
log.debug("Done opening resources")
def _filterType(self):
return 'bam'
@property
def hasPbi(self):
"""Test whether all resources are opened as IndexedBamReader objects"""
try:
res = self._pollResources(lambda x: isinstance(x,
IndexedBamReader))
return self._unifyResponses(res)
except ResourceMismatchError:
if not self._strict:
log.warn("Resources inconsistently indexed")
return False
else:
raise
def _split_barcodes(self, chunks=0):
"""Split a readset into chunks by barcodes.
Args:
:chunks: The number of chunks to emit. If chunks < # barcodes,
barcodes are grouped by size. If chunks == # barcodes, one
barcode is assigned to each dataset regardless of size. If
chunks >= # barcodes, only # barcodes chunks are emitted
"""
# Find all possible barcodes and counts for each
self.assertIndexed()
try:
self.assertBarcoded()
except RuntimeError:
log.info("No barcodes found in BAM file, skipping split")
return [self.copy()]
barcodes = defaultdict(int)
for bcTuple in itertools.izip(self.index.bcForward,
self.index.bcReverse):
if bcTuple != (-1, -1):
barcodes[bcTuple] += 1
log.debug("{i} barcodes found".format(i=len(barcodes.keys())))
atoms = barcodes.items()
# The number of reads per barcode is used for balancing
balanceKey = lambda x: x[1]
# Find the appropriate number of chunks
if chunks <= 0 or chunks > len(atoms):
chunks = len(atoms)
log.debug("Making copies")
results = [self.copy() for _ in range(chunks)]
log.debug("Distributing chunks")
chunks = self._chunkList(atoms, chunks, balanceKey)
log.debug("Done chunking")
log.debug("Modifying filters or resources")
for result, chunk in zip(results, chunks):
result.filters.removeRequirement('bc')
result.filters.addRequirement(
bc=[('=', list(c[0])) for c in chunk])
# UniqueId was regenerated when the ExternalResource list was
# whole, therefore we need to regenerate it again here
log.debug("Generating new UUID")
for result in results:
result.reFilter()
result.newUuid()
result.updateCounts()
# Update the basic metadata for the new DataSets from external
# resources, or at least mark as dirty
# TODO
return results
def split_movies(self, chunks):
"""Chunks requested:
0 or >= num_movies: One chunk per movie
1 to (num_movies - 1): Grouped somewhat evenly by num_records
"""
# File must have pbi index to be splittable:
if len(self) == 0:
return [self.copy()]
atoms = self.index.qId
movs = zip(*np.unique(atoms, return_counts=True))
# Zero chunks requested == 1 chunk per movie.
if chunks == 0 or chunks > len(movs):
chunks = len(movs)
if chunks == 1:
return [self.copy()]
balanceKey = lambda x: x[1]
log.debug("Starting chunking")
if chunks < len(movs):
groups = self._chunkList(movs, chunks, balanceKey)
else:
groups = [[mov] for mov in movs]
log.debug("Duplicating")
results = [self.copy() for _ in groups]
q2m = self.qid2mov
log.debug("Modifying filters or resources")
for result, group in zip(results, groups):
result.filters.addRequirement(movie=[('=', q2m[mov[0]])
for mov in group])
log.debug("Generating new UUID, updating counts")
for result in results:
result.updateCounts()
result.newUuid()
return results
def _split_zmws(self, chunks, targetSize=None):
"""The combination of <movie>_<holenumber> is assumed to refer to a
unique ZMW"""
if chunks == 1:
return [self.copy()]
# make sure we can pull out the movie name:
rgIdMovieNameMap = {rg[0]: rg[1] for rg in self.readGroupTable}
active_holenumbers = self.index
# lower limit on the number of chunks
n_chunks = min(len(active_holenumbers), chunks)
# if we have a target size and can have two or more chunks:
if (not targetSize is None and len(active_holenumbers) > 1 and
chunks > 1):
# we want at least two if we can swing it
desired = max(2, len(active_holenumbers)//targetSize)
n_chunks = min(n_chunks, desired)
if n_chunks != chunks:
log.info("Adjusted number of chunks to %d" % n_chunks)
# sort atoms and group into chunks:
active_holenumbers.sort(order=['qId', 'holeNumber'])
view = _fieldsView(self.index, ['qId', 'holeNumber'])
keys = np.unique(view)
# Find the beginning and end keys of each chunk
ranges = splitKeys(keys, n_chunks)
# The above ranges can include hidden, unrepresented movienames that
# are sandwiched between those identified by the range bounds. In
# order to capture those, we need to find the indices of the range
# bounds, then pull out the chunks.
hn_chunks = []
for zmw_range in ranges:
if zmw_range[0][0] == zmw_range[1][0]:
# The start and end movienames match, grab the appropriate
# record indices in one query:
hn_chunks.append(active_holenumbers[
(active_holenumbers['qId'] == zmw_range[0][0]) &
(active_holenumbers['holeNumber'] >= zmw_range[0][1]) &
(active_holenumbers['holeNumber'] <= zmw_range[1][1])])
else:
# The start and end movienames don't match, grab the first
# record matching the start of the range:
start = np.flatnonzero(
(active_holenumbers['qId'] == zmw_range[0][0]) &
(active_holenumbers['holeNumber'] == zmw_range[0][1]))[0]
# and grab the last record matching the end of the range:
end = np.flatnonzero(
(active_holenumbers['qId'] == zmw_range[1][0]) &
(active_holenumbers['holeNumber'] == zmw_range[1][1]))[-1]
# and add all indices between these two (with an exclusive
# right bound):
hn_chunks.append(active_holenumbers[start:(end + 1)])
results = []
log.debug("Making copies")
tmp_results = [self.copy() for _ in range(n_chunks)]
# add filters
for chunk, res in zip(hn_chunks, tmp_results):
# check if multiple movies:
if chunk[0]['qId'] == chunk[-1]['qId']:
movieName = rgIdMovieNameMap[chunk[0]['qId']]
zmwStart = chunk[0]['holeNumber']
zmwEnd = chunk[-1]['holeNumber']
res._filters.clearCallbacks()
newfilt = [[('movie', '=', movieName),
('zm', '<', zmwEnd + 1),
('zm', '>', zmwStart - 1)]]
res._filters.broadcastFilters(newfilt)
else:
movieNames = []
zmwStarts = []
zmwEnds = []
for mov in np.unique(chunk['qId']):
movieNames.append(rgIdMovieNameMap[mov])
inds = np.flatnonzero(chunk['qId'] == mov)
zmwStarts.append(chunk[inds[0]]['holeNumber'])
zmwEnds.append(chunk[inds[-1]]['holeNumber'])
res._filters.clearCallbacks()
newfilts = []
for mn, ze, zs in zip(movieNames, zmwEnds, zmwStarts):
newfilts.append([('movie', '=', mn),
('zm', '<', ze + 1),
('zm', '>', zs - 1)])
res._filters.broadcastFilters(newfilts)
res.numRecords = len(chunk)
res.totalLength = sum(chunk['qEnd'] - chunk['qStart'])
res.newUuid()
results.append(res)
# we changed the sort order above, so this is dirty:
self._index = None
self._indexMap = None
# Update the basic metadata for the new DataSets from external
# resources, or at least mark as dirty
# TODO
return results
def _split_read_groups(self):
"""
Crude implementation of split-by-read-group. This isn't currently
useful for chunking, but it allows datasets for individual biosamples
to be easily extracted.
NOTE: while the records in each dataset correspond to a single read
group, the readGroupTable propery of the dataset and BAM file(s) will
still list all read groups present in the original unsplit dataset.
To determine what read group a given dataset corresponds to, use
np.unique(ds.index.qId) to extract the numeric ID.
"""
results = []
for rg in self.readGroupTable:
res = self.copy()
res._filters.clearCallbacks()
res._filters.broadcastFilters([[('qId', '=', rg.ID)]])
sel = self.index.qId == rg.ID
qlengths = self.index.qEnd[sel] - self.index.qStart[sel]
res.numRecords = qlengths.size
res.totalLength = np.sum(qlengths)
res.newUuid()
results.append(res)
return results
@property
def readGroupTable(self):
"""Combine the readGroupTables of each external resource"""
responses = self._pollResources(lambda x: x.readGroupTable)
if len(responses) > 1:
# append the read groups, but eliminate duplicates.
tbr = _uniqueRecords(reduce(np.append, responses))
# reassign qIds if dupes:
if len(set(tbr['ID'])) < len(tbr):
self._readGroupTableIsRemapped = True
tbr['ID'] = range(len(tbr))
return tbr.view(np.recarray)
else:
return responses[0]
@property
def movieIds(self):
"""A dict of movieName: movieId for the joined readGroupTable
TODO: depricate this for more descriptive mov2qid"""
return self.mov2qid
@property
def mov2qid(self):
"""A dict of movieId: movieName for the joined readGroupTable"""
return {rg.MovieName: rg.ID for rg in self.readGroupTable}
@property
def qid2mov(self):
"""A dict of movieId: movieName for the joined readGroupTable"""
return {rg.ID: rg.MovieName for rg in self.readGroupTable}
def assertIndexed(self):
self._assertIndexed((IndexedBamReader, CmpH5Reader))
@property
def isCmpH5(self):
"""Test whether all resources are cmp.h5 files"""
res = self._pollResources(lambda x: isinstance(x, CmpH5Reader))
return self._unifyResponses(res)
def _fixQIds(self, indices, resourceReader):
qId_acc = lambda x: x.MovieID
if not self.isCmpH5:
qId_acc = lambda x: x.qId
rr = resourceReader
try:
# this would populate the _readGroupTableIsRemapped member, but
# for whatever reason a lot of cmp.h5's are broken
_ = self.readGroupTable
except ChemistryLookupError:
# this should be an error, but that would mess up Quiver cram
# tests. If anyone tries to access the readGroupTable in a
# dataset it will still fail, at least
log.info("Chemistry information could not be found in "
"cmp.h5, cannot fix the readGroupTable or "
"MovieID field.")
if self._readGroupTableIsRemapped:
log.debug("Must correct index qId's")
qIdMap = dict(zip(rr.readGroupTable.ID,
rr.readGroupTable.MovieName))
nameMap = self.movieIds
for qId in qIdMap.keys():
qId_acc(indices)[qId_acc(indices) == qId] = nameMap[
qIdMap[qId]]
def _indexRecords(self):
"""Returns index recarray summarizing all of the records in all of
the resources that conform to those filters addressing parameters
cached in the pbi.
"""
recArrays = []
_indexMap = []
for rrNum, rr in enumerate(self.resourceReaders()):
indices = rr.index
self._fixQIds(indices, rr)
if not self._filters or self.noFiltering:
recArrays.append(indices._tbl)
_indexMap.extend([(rrNum, i) for i in
range(len(indices._tbl))])
else:
# Filtration will be necessary:
nameMap = {}
if not rr.referenceInfoTable is None:
nameMap = {name: n
for n, name in enumerate(
rr.referenceInfoTable['Name'])}
passes = self._filters.filterIndexRecords(indices._tbl,
nameMap,
self.movieIds)
newInds = indices._tbl[passes]
recArrays.append(newInds)
_indexMap.extend([(rrNum, i) for i in
np.flatnonzero(passes)])
self._indexMap = np.array(_indexMap, dtype=[('reader', 'uint64'),
('index', 'uint64')])
if recArrays == []:
return recArrays
return _stackRecArrays(recArrays)
def resourceReaders(self):
"""Open the files in this ReadSet"""
if not self._openReaders:
self._openFiles()
return self._openReaders
@property
def _length(self):
"""Used to populate metadata in updateCounts. We're using the pbi here,
which is necessary and sufficient for both subreadsets and
alignmentsets, but doesn't work for hdfsubreadsets. Rather than
duplicate code, we'll implement the hdf specific _length as an
overriding function where needed.
..note:: Both mapped and unmapped bams can be either indexed or
unindexed. This makes life more difficult, but we should
expect a pbi for both subreadsets and alignmentsets
"""
count = len(self.index)
length = 0
if count:
length = sum(self.index.qEnd - self.index.qStart)
return count, length
def _resourceSizes(self):
sizes = []
for rr in self.resourceReaders():
sizes.append((len(rr.index), sum(rr.index.qEnd - rr.index.qStart)))
return sizes
def addMetadata(self, newMetadata, **kwargs):
"""Add metadata specific to this subtype, while leaning on the
superclass method for generic metadata. Also enforce metadata type
correctness."""
# Validate, clean and prep input data
if newMetadata:
if isinstance(newMetadata, dict):
newMetadata = SubreadSetMetadata(newMetadata)
elif isinstance(newMetadata, SubreadSetMetadata) or (
type(newMetadata).__name__ == 'DataSetMetadata'):
newMetadata = SubreadSetMetadata(newMetadata.record)
else:
raise TypeError("Cannot extend SubreadSetMetadata with "
"{t}".format(t=type(newMetadata).__name__))
# Pull generic values, kwargs, general treatment in super
super(ReadSet, self).addMetadata(newMetadata, **kwargs)
def consolidate(self, dataFile, numFiles=1, useTmp=True):
"""Consolidate a larger number of bam files to a smaller number of bam
files (min 1)
Args:
:dataFile: The name of the output file. If numFiles >1 numbers will
be added.
:numFiles: The number of data files to be produced.
"""
references = [er.reference for er in self.externalResources if
er.reference]
log.debug("Using pbmerge to consolidate")
dsets = self.split(zmws=True, chunks=numFiles)
if numFiles > 1:
fnames = [_infixFname(dataFile, str(i))
for i in range(numFiles)]
else:
fnames = [dataFile]
for chunk, fname in zip(dsets, fnames):
consolidateXml(chunk, fname, useTmp=useTmp)
log.debug("Replacing resources")
self.externalResources = ExternalResources()
self.addExternalResources(fnames)
self.induceIndices()
# make sure reference gets passed through:
if references:
refCounts = dict(Counter(references))
if len(refCounts) > 1:
log.warn("Consolidating AlignmentSets with "
"different references, but BamReaders "
"can only have one. References will be "
"lost")
else:
for extres in self.externalResources:
extres.reference = refCounts.keys()[0]
# reset the indexmap especially, as it is out of date:
self._index = None
self._indexMap = None
self._openReaders = []
self._populateMetaTypes()
def updateCounts(self):
if self._skipCounts:
log.debug("SkipCounts is true, skipping updateCounts()")
self.metadata.totalLength = -1
self.metadata.numRecords = -1
return
try:
self.assertIndexed()
log.debug('Updating counts')
numRecords, totalLength = self._length
self.metadata.totalLength = totalLength
self.metadata.numRecords = numRecords
self._countsUpdated = True
except (IOError, UnavailableFeature):
if not self._strict:
log.debug("File problem, metadata not populated")
self.metadata.totalLength = 0
self.metadata.numRecords = 0
else:
raise
class HdfSubreadSet(ReadSet):
datasetType = DataSetMetaTypes.HDF_SUBREAD
def __init__(self, *files, **kwargs):
super(HdfSubreadSet, self).__init__(*files, **kwargs)
# The metatype for this dataset type is inconsistent, plaster over it
# here:
self.objMetadata["MetaType"] = "PacBio.DataSet.HdfSubreadSet"
self.objMetadata["TimeStampedName"] = getTimeStampedName(
self.objMetadata["MetaType"])
def induceIndices(self, force=False):
log.debug("Bax files don't have external indices")
@property
def isIndexed(self):
return False
def _openFiles(self):
"""Open the files (assert they exist, assert they are of the proper
type before accessing any file)
"""
if self._openReaders:
log.debug("Closing old readers...")
self.close()
log.debug("Opening resources")
for extRes in self.externalResources:
location = urlparse(extRes.resourceId).path
resource = BaxH5Reader(location)
self._openReaders.append(resource)
if len(self._openReaders) == 0 and len(self.toExternalFiles()) != 0:
raise IOError("No files were openable or reads found")
log.debug("Done opening resources")
@property
def _length(self):
"""Used to populate metadata in updateCounts"""
length = 0
count = 0
for rec in self.records:
count += 1
hqReg = rec.hqRegion
length += hqReg[1] - hqReg[0]
return count, length
def updateCounts(self):
"""Overriding here so we don't have to assertIndexed"""
if self._skipCounts:
log.debug("SkipCounts is true, skipping updateCounts()")
self.metadata.totalLength = -1
self.metadata.numRecords = -1
return
try:
log.debug('Updating counts')
numRecords, totalLength = self._length
self.metadata.totalLength = totalLength
self.metadata.numRecords = numRecords
self._countsUpdated = True
except (IOError, UnavailableFeature):
if not self._strict:
log.debug("File problem, metadata not populated")
self.metadata.totalLength = 0
self.metadata.numRecords = 0
else:
raise
def consolidate(self, dataFile, numFiles=1):
raise NotImplementedError()
@staticmethod
def _metaTypeMapping():
return {'bax.h5':'PacBio.SubreadFile.BaxFile',
'bas.h5':'PacBio.SubreadFile.BasFile', }
class SubreadSet(ReadSet):
"""DataSet type specific to Subreads
DocTest:
>>> from pbcore.io import SubreadSet
>>> from pbcore.io.dataset.DataSetMembers import ExternalResources
>>> import pbcore.data.datasets as data
>>> ds1 = SubreadSet(data.getXml(no=5), skipMissing=True)
>>> ds2 = SubreadSet(data.getXml(no=5), skipMissing=True)
>>> # So they don't conflict:
>>> ds2.externalResources = ExternalResources()
>>> ds1 # doctest:+ELLIPSIS
<SubreadSet...
>>> ds1._metadata # doctest:+ELLIPSIS
<SubreadSetMetadata...
>>> ds1._metadata # doctest:+ELLIPSIS
<SubreadSetMetadata...
>>> ds1.metadata # doctest:+ELLIPSIS
<SubreadSetMetadata...
>>> len(ds1.metadata.collections)
1
>>> len(ds2.metadata.collections)
1
>>> ds3 = ds1 + ds2
>>> len(ds3.metadata.collections)
2
>>> ds4 = SubreadSet(data.getSubreadSet(), skipMissing=True)
>>> ds4 # doctest:+ELLIPSIS
<SubreadSet...
>>> ds4._metadata # doctest:+ELLIPSIS
<SubreadSetMetadata...
>>> len(ds4.metadata.collections)
1
"""
datasetType = DataSetMetaTypes.SUBREAD
def __init__(self, *files, **kwargs):
super(SubreadSet, self).__init__(*files, **kwargs)
@staticmethod
def _metaTypeMapping():
# This doesn't work for scraps.bam, whenever that is implemented
return {'bam':'PacBio.SubreadFile.SubreadBamFile',
'bai':'PacBio.Index.BamIndex',
'pbi':'PacBio.Index.PacBioIndex',
}
def getMovieSampleNames(self):
"""
Map the BioSample names in Collection metadata to "context" ID, i.e.
movie names. Used for deconvoluting multi-sample
inputs. This function will raise a KeyError if a movie name is not
unique, or a ValueError if there is not a 1-to-1 mapping of sample to
to movie.
"""
import warnings
warnings.warn("SubreadSet.getMovieSampleNames is deprecated, " +
"use DataSet.metadata.getMovieSampleNames instead.")
return self.metadata.getMovieSampleNames()
class AlignmentSet(ReadSet):
"""DataSet type specific to Alignments. No type specific Metadata exists,
so the base class version is OK (this just ensures type representation on
output and expandability"""
datasetType = DataSetMetaTypes.ALIGNMENT
def __init__(self, *files, **kwargs):
""" An AlignmentSet
Args:
:files: handled by super
:referenceFastaFname=None: the reference fasta filename for this \
alignment.
:strict=False: see base class
:skipCounts=False: see base class
"""
super(AlignmentSet, self).__init__(*files, **kwargs)
fname = kwargs.get('referenceFastaFname', None)
if fname:
self.addReference(fname)
self.__referenceIdMap = None
@property
@filtered
def records(self):
"""A generator of (usually) BamAlignment objects for the
records in one or more Bam files pointed to by the
ExternalResources in this DataSet.
Yields:
A BamAlignment object
Doctest:
>>> from __future__ import print_function
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet
>>> ds = AlignmentSet(data.getBam())
>>> for record in ds.records:
... print('hn: %i' % record.holeNumber) # doctest:+ELLIPSIS
hn: ...
"""
if self.isIndexed:
for i in range(len(self.index)):
yield self[i]
else:
for resource in self.resourceReaders():
for record in resource:
yield record
def consolidate(self, *args, **kwargs):
if self.isCmpH5:
raise NotImplementedError()
else:
return super(AlignmentSet, self).consolidate(*args, **kwargs)
def induceIndices(self, force=False):
if self.isCmpH5:
log.debug("Cmp.h5 files already indexed")
else:
return super(AlignmentSet, self).induceIndices(force=force)
@property
def isIndexed(self):
if self.isCmpH5:
return True
else:
return super(AlignmentSet, self).isIndexed
def addReference(self, fname):
if isinstance(fname, ReferenceSet):
reference = fname.externalResources.resourceIds
else:
reference = ReferenceSet(fname).externalResources.resourceIds
if len(reference) > 1:
if len(reference) != self.numExternalResources:
raise ResourceMismatchError(
"More than one reference found, but not enough for one "
"per resource")
for res, ref in zip(self.externalResources, reference):
res.reference = ref
else:
for res in self.externalResources:
res.reference = reference[0]
self._openFiles()
def _fixTIds(self, indices, rr, correctIds=True):
tId_acc = lambda x: x.RefGroupID
rName = lambda x: x['FullName']
if not self.isCmpH5:
tId_acc = lambda x: x.tId
rName = lambda x: x['Name']
if correctIds and self._stackedReferenceInfoTable:
log.debug("Must correct index tId's")
tIdMap = dict(zip(rr.referenceInfoTable['ID'],
rName(rr.referenceInfoTable)))
unfilteredRefTable = self._buildRefInfoTable(filterMissing=False)
rname2tid = dict(zip(unfilteredRefTable['Name'],
unfilteredRefTable['ID']))
#nameMap = self.refIds
for tId in tIdMap.keys():
tId_acc(indices)[tId_acc(indices) == tId] = rname2tid[
tIdMap[tId]]
def _indexRecords(self, correctIds=True):
"""Returns index records summarizing all of the records in all of
the resources that conform to those filters addressing parameters
cached in the pbi.
"""
recArrays = []
log.debug("Processing resource indices")
_indexMap = []
for rrNum, rr in enumerate(self.resourceReaders()):
indices = rr.index
# pbi files lack e.g. mapping cols when bam emtpy, ignore
# TODO(mdsmith)(2016-01-19) rename the fields instead of branching:
#if self.isCmpH5:
# _renameField(indices, 'MovieID', 'qId')
# _renameField(indices, 'RefGroupID', 'tId')
if not self.isCmpH5:
indices = indices._tbl
# Correct tId field
self._fixTIds(indices, rr, correctIds)
# Correct qId field
self._fixQIds(indices, rr)
# filter
if not self._filters or self.noFiltering:
recArrays.append(indices)
_indexMap.extend([(rrNum, i) for i in
range(len(indices))])
else:
passes = self._filters.filterIndexRecords(indices, self.refIds,
self.movieIds)
newInds = indices[passes]
recArrays.append(newInds)
_indexMap.extend([(rrNum, i) for i in
np.flatnonzero(passes)])
self._indexMap = np.array(_indexMap, dtype=[('reader', 'uint64'),
('index', 'uint64')])
if recArrays == []:
return recArrays
tbr = _stackRecArrays(recArrays)
# sort if cmp.h5 so we can rectify RowStart/End, maybe someday bam
if self.isCmpH5:
sort_order = np.argsort(tbr, order=('RefGroupID', 'tStart',
'tEnd',))
tbr = tbr[sort_order]
self._indexMap = self._indexMap[sort_order]
ranges = _ranges_in_list(tbr.RefGroupID)
for ref in self.referenceInfoTable:
bounds = ranges.get(ref.ID)
if bounds:
ref.StartRow = bounds[0]
# we want the ranges to be inclusive:
ref.EndRow = bounds[1] - 1
# and fix the naming scheme while we're at it
ref.Name = self._cleanCmpName(ref.FullName)
return tbr
def resourceReaders(self, refName=False):
"""A generator of Indexed*Reader objects for the ExternalResources
in this DataSet.
Args:
:refName: Only yield open resources if they have refName in their
referenceInfoTable
Yields:
An open indexed alignment file
Doctest:
>>> from __future__ import print_function
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet
>>> ds = AlignmentSet(data.getBam())
>>> for seqFile in ds.resourceReaders():
... for record in seqFile:
... print('hn: %i' % record.holeNumber) # doctest:+ELLIPSIS
hn: ...
"""
if refName:
if (not refName in self.refNames and
not refName in self.fullRefNames):
_ = int(refName)
refName = self._idToRname(refName)
if not self._openReaders:
self._openFiles()
if refName:
return [resource for resource in self._openReaders
if refName in resource.referenceInfoTable['FullName'] or
refName in resource.referenceInfoTable['Name']]
else:
return self._openReaders
@property
def refNames(self):
"""A list of reference names (id)."""
return np.sort(self.referenceInfoTable["Name"])
def split_references(self, chunks):
"""Chunks requested:
0 or >= num_refs: One chunk per reference
1 to (num_refs - 1): Grouped somewhat evenly by num_records
"""
# File must have pbi index to be splittable:
if len(self) == 0:
return [self.copy()]
atoms = self.index.tId
refs = zip(*np.unique(atoms, return_counts=True))
# Zero chunks requested == 1 chunk per reference.
if chunks == 0 or chunks > len(refs):
chunks = len(refs)
if chunks == 1:
return [self.copy()]
balanceKey = lambda x: x[1]
log.debug("Starting chunking")
if chunks < len(refs):
groups = self._chunkList(refs, chunks, balanceKey)
else:
groups = [[ref] for ref in refs]
log.debug("Duplicating")
results = [self.copy() for _ in groups]
i2n = self.tid2rname
log.debug("Modifying filters or resources")
for result, group in zip(results, groups):
result.filters.addRequirement(rname=[('=', i2n[ref[0]])
for ref in group])
log.debug("Generating new UUID, updating counts")
for result in results:
result.updateCounts()
result.newUuid()
return results
def _indexReadsInReference(self, refName):
# This can probably be deprecated for all but the official reads in
# range (and maybe reads in reference)
refName = self.guaranteeName(refName)
desiredTid = self.refIds[refName]
tIds = self.tId
passes = tIds == desiredTid
return self.index[passes]
def _resourceSizes(self):
sizes = []
for rr in self.resourceReaders():
sizes.append((len(rr.index), sum(rr.index.aEnd - rr.index.aStart)))
return sizes
@property
def refWindows(self):
"""Going to be tricky unless the filters are really focused on
windowing the reference. Much nesting or duplication and the correct
results are really not guaranteed"""
log.debug("Fetching reference windows...")
windowTuples = []
nameIDs = self.refInfo('Name')
refLens = None
for name, refID in nameIDs:
for filt in self._filters:
thisOne = False
for param in filt:
if param.name == 'rname':
if param.value == name:
thisOne = True
if thisOne:
winstart = 0
winend = -1
for param in filt:
if param.name == 'tstart':
winend = param.value
if param.name == 'tend':
winstart = param.value
# If the filter is just for rname, fill the window
# boundaries (pricey but worth the guaranteed behavior)
if winend == -1:
if not refLens:
refLens = self.refLengths
winend = refLens[name]
windowTuples.append((refID, int(winstart), int(winend)))
# no tuples found: return full length of each reference
if not windowTuples:
for name, refId in nameIDs:
if not refLens:
refLens = self.refLengths
refLen = refLens[name]
windowTuples.append((refId, 0, refLen))
log.debug("Done fetching reference windows")
return sorted(windowTuples)
def countRecords(self, rname=None, winStart=None, winEnd=None):
"""Count the number of records mapped to 'rname' that overlap with
'window'"""
if rname and winStart != None and winEnd != None:
return len(self._indexReadsInRange(rname, winStart, winEnd))
elif rname:
return len(self._indexReadsInReference(rname))
else:
return len(self.index)
def readsInReference(self, refName):
"""A generator of (usually) BamAlignment objects for the
reads in one or more Bam files pointed to by the ExternalResources in
this DataSet that are mapped to the specified reference genome.
Args:
:refName: the name of the reference that we are sampling.
Yields:
BamAlignment objects
Doctest:
>>> from __future__ import print_function
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet
>>> ds = AlignmentSet(data.getBam())
>>> for read in ds.readsInReference(ds.refNames[15]):
... print('hn: %i' % read.holeNumber) # doctest:+ELLIPSIS
hn: ...
"""
try:
refName = self.guaranteeName(refName)
refLen = self.refLengths[refName]
except (KeyError, AttributeError):
raise StopIteration
for read in self.readsInRange(refName, 0, refLen):
yield read
def intervalContour(self, rname, tStart=0, tEnd=None):
"""Take a set of index records and build a pileup of intervals, or
"contour" describing coverage over the contig
..note:: Naively incrementing values in an array is too slow and takes
too much memory. Sorting tuples by starts and ends and iterating
through them and the reference (O(nlogn + nlogn + n + n + m)) takes too
much memory and time. Iterating over the reference, using numpy
conditional indexing at each base on tStart and tEnd columns uses no
memory, but is too slow (O(nm), but in numpy (C, hopefully)). Building
a delta list via sorted tStarts and tEnds one at a time saves memory
and is ~5x faster than the second method above (O(nlogn + nlogn + m)).
"""
log.debug("Generating coverage summary")
index = self._indexReadsInReference(rname)
if tEnd is None:
tEnd = self.refLengths[rname]
coverage = [0] * (tEnd - tStart)
starts = sorted(index.tStart)
for i in starts:
# ends are exclusive
if i >= tEnd:
continue
if i >= tStart:
coverage[i - tStart] += 1
else:
coverage[0] += 1
del starts
ends = sorted(index.tEnd)
for i in ends:
# ends are exclusive
if i <= tStart:
continue
# ends are exclusive
if i < tEnd:
coverage[i - tStart] -= 1
del ends
curCov = 0
for i, delta in enumerate(coverage):
curCov += delta
coverage[i] = curCov
return coverage
def splitContour(self, contour, splits):
"""Take a contour and a number of splits, return the location of each
coverage mediated split with the first at 0"""
log.debug("Splitting coverage summary")
totalCoverage = sum(contour)
splitSize = totalCoverage//splits
tbr = [0]
for _ in range(splits - 1):
size = 0
# Start where the last one ended, so we append the current endpoint
tbr.append(tbr[-1])
while (size < splitSize and
tbr[-1] < (len(contour) - 1)):
# iterate the endpoint
tbr[-1] += 1
# track the size
size += contour[tbr[-1]]
assert len(tbr) == splits
return tbr
def _shiftAtoms(self, atoms):
shiftedAtoms = []
rnames = defaultdict(list)
for atom in atoms:
rnames[atom[0]].append(atom)
for rname, rAtoms in rnames.iteritems():
if len(rAtoms) > 1:
contour = self.intervalContour(rname)
splits = self.splitContour(contour, len(rAtoms))
ends = splits[1:] + [self.refLengths[rname]]
for start, end in zip(splits, ends):
newAtom = (rname, start, end)
shiftedAtoms.append(newAtom)
else:
shiftedAtoms.append(rAtoms[0])
return shiftedAtoms
def _split_contigs(self, chunks, maxChunks=0, breakContigs=False,
targetSize=5000, byRecords=False, updateCounts=True):
"""Split a dataset into reference windows based on contigs.
Args:
:chunks: The number of chunks to emit. If chunks < # contigs,
contigs are grouped by size. If chunks == contigs, one
contig is assigned to each dataset regardless of size. If
chunks >= contigs, contigs are split into roughly equal
chunks (<= 1.0 contig per file).
"""
# removed the non-trivial case so that it is still filtered to just
# contigs with associated records
# The format is rID, start, end, for a reference window
log.debug("Fetching reference names and lengths")
# pull both at once so you only have to mess with the
# referenceInfoTable once.
refLens = self.refLengths
refNames = refLens.keys()
log.debug("{i} references found".format(i=len(refNames)))
log.debug("Finding contigs")
if byRecords:
log.debug("Counting records...")
atoms = [(rn, 0, 0, count)
for rn, count in zip(refNames, map(self.countRecords,
refNames))
if count != 0]
balanceKey = lambda x: self.countRecords(*x)
else:
# if there are that many references, on average they will probably
# be distributed pretty evenly. Checking the counts will also be
# super expensive
if len(refNames) < 100:
atoms = [(rn, 0, refLens[rn]) for rn in refNames if
self.countRecords(rn) != 0]
else:
atoms = [(rn, 0, refLens[rn]) for rn in refNames]
balanceKey = lambda x: x[2] - x[1]
log.debug("{i} contigs found".format(i=len(atoms)))
# By providing maxChunks and not chunks, this combination will set
# chunks down to <= len(atoms) <= maxChunks
if not chunks:
log.debug("Chunks not set, splitting to len(atoms): {i}"
.format(i=len(atoms)))
chunks = len(atoms)
if maxChunks and chunks > maxChunks:
log.debug("maxChunks trumps chunks")
chunks = maxChunks
# Decide whether to intelligently limit chunk count:
if maxChunks and breakContigs:
# The bounds:
minNum = 2
maxNum = maxChunks
# Adjust
log.debug("Target numRecords per chunk: {i}".format(i=targetSize))
dataSize = self.numRecords
log.debug("numRecords in dataset: {i}".format(i=dataSize))
chunks = int(dataSize//targetSize)
# Respect bounds:
chunks = minNum if chunks < minNum else chunks
chunks = maxNum if chunks > maxNum else chunks
log.debug("Resulting number of chunks: {i}".format(i=chunks))
# refwindow format: rId, start, end
if chunks > len(atoms):
# splitting atom format is slightly different (but more compatible
# going forward with countRecords): (rId, size, segments)
# Lets do a rough split, counting reads once and assuming uniform
# coverage (reads span, therefore can't split by specific reads)
if byRecords:
atoms = [[rn, size, 1] for rn, _, _, size in atoms]
else:
atoms = [[rn, refLens[rn], 1] for rn, _, _ in atoms]
log.debug("Splitting atoms")
atoms = self._split_atoms(atoms, chunks)
# convert back to window format:
segments = []
for atom in atoms:
segment_size = atom[1]//atom[2]
if byRecords:
segment_size = refLens[atom[0]]//atom[2]
sub_segments = [(atom[0], segment_size * i, segment_size *
(i + 1)) for i in range(atom[2])]
# if you can't divide it evenly you may have some messiness
# with the last window. Fix it:
tmp = sub_segments.pop()
tmp = (tmp[0], tmp[1], refLens[tmp[0]])
sub_segments.append(tmp)
segments.extend(sub_segments)
atoms = segments
elif breakContigs and not byRecords:
log.debug("Checking for oversized chunks")
# we may have chunks <= len(atoms). We wouldn't usually split up
# contigs, but some might be huge, resulting in some tasks running
# very long
# We are only doing this for refLength splits for now, as those are
# cheap (and quiver is linear in length not coverage)
dataSize = sum(refLens.values())
# target size per chunk:
target = dataSize//chunks
log.debug("Target chunk length: {t}".format(t=target))
newAtoms = []
for i, atom in enumerate(atoms):
testAtom = atom
while testAtom[2] - testAtom[1] > target:
newAtom1 = (testAtom[0], testAtom[1], testAtom[1] + target)
newAtom2 = (testAtom[0], testAtom[1] + target, testAtom[2])
newAtoms.append(newAtom1)
testAtom = newAtom2
newAtoms.append(testAtom)
atoms = newAtoms
log.debug("Done defining {n} chunks".format(n=chunks))
# duplicate
log.debug("Making copies")
results = [self.copy() for _ in range(chunks)]
if byRecords:
log.debug("Respacing chunks by records")
atoms = self._shiftAtoms(atoms)
# indicates byRecords with no sub atom splits: (the fourth spot is
# countrecords in that window)
if len(atoms[0]) == 4:
balanceKey = lambda x: x[3]
log.debug("Distributing chunks")
# if we didn't have to split atoms and are doing it byRecords, the
# original counts are still valid:
#
# Otherwise we'll now have to count records again to recombine atoms
chunks = self._chunkList(atoms, chunks, balanceKey)
log.debug("Done chunking")
log.debug("Modifying filters or resources")
for result, chunk in zip(results, chunks):
# we don't want to updateCounts or anything right now, so we'll
# block that functionality:
result._filters.clearCallbacks()
if atoms[0][2]:
result._filters.addRequirement(
rname=[('=', c[0]) for c in chunk],
tStart=[('<', c[2]) for c in chunk],
tEnd=[('>', c[1]) for c in chunk])
else:
result._filters.addRequirement(
rname=[('=', c[0]) for c in chunk])
# UniqueId was regenerated when the ExternalResource list was
# whole, therefore we need to regenerate it again here
log.debug("Generating new UUID")
# At this point the ID's should be corrected, so the namemap should be
# here:
for result in results:
result.newUuid()
# If there are so many filters that it will be really expensive, we
# will use an approximation for the number of records and bases.
# This is probably not too far off, if there are that many chunks
# to distribute. We'll still round to indicate that it is an
# abstraction.
if len(result._filters) > 100:
meanNum = self.numRecords//len(chunks)
result.numRecords = long(round(meanNum,
(-1 * len(str(meanNum))) + 3))
meanLen = self.totalLength//len(chunks)
result.totalLength = long(round(meanLen,
(-1 * len(str(meanLen))) + 3))
elif updateCounts:
result._openReaders = self._openReaders
passes = result._filters.filterIndexRecords(self.index,
self.refIds,
self.movieIds)
result._index = self.index[passes]
result.updateCounts()
del result._index
del passes
result._index = None
# Update the basic metadata for the new DataSets from external
# resources, or at least mark as dirty
# TODO
return results
def _indexReadsInRange(self, refName, start, end, justIndices=False):
"""Return the index (pbi) records within a range.
..note:: Not sorted by genomic location!
"""
desiredTid = self.refIds[refName]
if self.isCmpH5:
passes = ((self.index.RefGroupID == desiredTid) &
(self.index.tStart < end) &
(self.index.tEnd > start))
else:
passes = ((self.index.tId == desiredTid) &
(self.index.tStart < end) &
(self.index.tEnd > start))
if justIndices:
return np.nonzero(passes)[0]
#return passes
return self.index[passes]
def _pbiReadsInRange(self, refName, start, end, longest=False,
sampleSize=0):
"""Return the reads in range for a file, but use the index in this
object to get the order of the (reader, read) index tuples, instead of
using the pbi rangeQuery for each file and merging the actual reads.
This also opens up the ability to sort the reads by length in the
window, and yield in that order (much much faster for quiver)
Args:
:refName: The reference name to sample
:start: The start of the target window
:end: The end of the target window
:longest: (False) yield the longest reads first
Yields:
reads in the range, potentially longest first
"""
if not refName in self.refNames:
raise StopIteration
# get pass indices
passes = self._indexReadsInRange(refName, start, end, justIndices=True)
mapPasses = self._indexMap[passes]
if longest:
def lengthInWindow(hits):
ends = hits.tEnd
post = ends > end
ends[post] = end
starts = hits.tStart
pre = starts < start
starts[pre] = start
return ends - starts
lens = lengthInWindow(self.index[passes])
# reverse the keys here, so the descending sort is stable
lens = (end - start) - lens
if sampleSize != 0:
if len(lens) != 0:
min_len = min(lens)
count_min = Counter(lens)[min_len]
else:
count_min = 0
if count_min > sampleSize:
sort_order = lens.argsort()
else:
sort_order = lens.argsort(kind='mergesort')
else:
sort_order = lens.argsort(kind='mergesort')
mapPasses = mapPasses[sort_order]
elif len(self.toExternalFiles()) > 1:
# sort the pooled passes and indices using a stable algorithm
sort_order = self.index[passes].tStart.argsort(kind='mergesort')
# pull out indexMap using those passes
mapPasses = mapPasses[sort_order]
return self._getRecords(mapPasses)
def _getRecords(self, indexList, buffsize=1):
"""Get the records corresponding to indexList
Args:
:indexList: A list of (reader, read) index tuples
:buffsize: The number of reads to buffer (coalesced file reads)
Yields:
reads from all files
"""
# yield in order of sorted indexMap
if buffsize == 1:
for indexTuple in indexList:
yield self.resourceReaders()[indexTuple[0]].atRowNumber(
indexTuple[1])
else:
def debuf():
# This will store the progress through the buffer for each
# reader
reqCacheI = [0] * len(self.resourceReaders())
# fill the record cache
for rrNum, rr in enumerate(self.resourceReaders()):
for req in reqCache[rrNum]:
recCache[rrNum].append(rr.atRowNumber(req))
# empty cache
for i in range(cacheFill):
rrNum = fromCache[i]
curI = reqCacheI[rrNum]
reqCacheI[rrNum] += 1
yield recCache[rrNum][curI]
def cleanBuffs():
# This will buffer the records being pulled from each reader
recCache = [[] for _ in self.resourceReaders()]
# This will buffer the indicies being pulled from each reader
reqCache = [[] for _ in self.resourceReaders()]
# This will store the order in which reads are consumed, which
# here can be specified by the reader number (read index order
# is cached in the reqCache buffer)
fromCache = [None] * buffsize
return recCache, reqCache, fromCache, 0
# The algorithm:
recCache, reqCache, fromCache, cacheFill = cleanBuffs()
for indexTuple in indexList:
# segregate the requests by reader into ordered lists of read
# indices
reqCache[indexTuple[0]].append(indexTuple[1])
# keep track of the order in which readers should be sampled,
# which will maintain the overall read order
fromCache[cacheFill] = indexTuple[0]
cacheFill += 1
if cacheFill >= buffsize:
for rec in debuf():
yield rec
recCache, reqCache, fromCache, cacheFill = cleanBuffs()
if cacheFill > 0:
for rec in debuf():
yield rec
def guaranteeName(self, nameOrId):
refName = nameOrId
if isinstance(refName, np.int64):
refName = str(refName)
if refName.isdigit():
if (not refName in self.refNames and
not refName in self.fullRefNames):
# we need the real refName, which may be hidden behind a
# mapping to resolve duplicate refIds between resources...
refName = self._idToRname(int(refName))
return refName
def readsInRange(self, refName, start, end, buffsize=50, usePbi=True,
longest=False, sampleSize=0, justIndices=False):
"""A generator of (usually) BamAlignment objects for the reads in one
or more Bam files pointed to by the ExternalResources in this DataSet
that have at least one coordinate within the specified range in the
reference genome.
Rather than developing some convoluted approach for dealing with
auto-inferring the desired references, this method and self.refNames
should allow users to compose the desired query.
Args:
:refName: the name of the reference that we are sampling
:start: the start of the range (inclusive, index relative to \
reference)
:end: the end of the range (inclusive, index relative to reference)
Yields:
BamAlignment objects
Doctest:
>>> from __future__ import print_function
>>> import pbcore.data.datasets as data
>>> from pbcore.io import AlignmentSet
>>> ds = AlignmentSet(data.getBam())
>>> for read in ds.readsInRange(ds.refNames[15], 100, 150):
... print('hn: %i' % read.holeNumber) # doctest:+ELLIPSIS
hn: ...
"""
refName = self.guaranteeName(refName)
# correct the cmp.h5 reference names before reads go out the door
if self.isCmpH5:
for res in self.resourceReaders():
for row in res.referenceInfoTable:
row.FullName = self._cleanCmpName(row.FullName)
if justIndices:
return self._indexReadsInRange(refName, start, end,
justIndices=True)
else:
return (read for read in self._readsInRange(refName, start, end,
buffsize, usePbi,
longest, sampleSize))
@filtered
def _readsInRange(self, refName, start, end, buffsize=50, usePbi=True,
longest=False, sampleSize=0):
if self.hasPbi and usePbi:
for rec in self._pbiReadsInRange(refName, start, end,
longest=longest,
sampleSize=sampleSize):
yield rec
raise StopIteration
# merge sort before yield
if self.numExternalResources > 1:
if buffsize > 1:
# create read/reader caches
read_its = [iter(rr.readsInRange(refName, start, end))
for rr in self.resourceReaders()]
deep_buf = [[next(it, None) for _ in range(buffsize)]
for it in read_its]
# remove empty iterators
read_its = [it for it, cur in zip(read_its, deep_buf)
if cur[0]]
deep_buf = [buf for buf in deep_buf if buf[0]]
# populate starting values/scratch caches
buf_indices = [0 for _ in read_its]
tStarts = [cur[0].tStart for cur in deep_buf]
while len(read_its) != 0:
# pick the first one to yield
# this should be a bit faster than taking the min of an
# enumeration of currents with a key function accessing a
# field...
first = min(tStarts)
first_i = tStarts.index(first)
buf_index = buf_indices[first_i]
first = deep_buf[first_i][buf_index]
# update the buffers
buf_index += 1
buf_indices[first_i] += 1
if buf_index == buffsize:
buf_index = 0
buf_indices[first_i] = 0
deep_buf[first_i] = [next(read_its[first_i], None)
for _ in range(buffsize)]
if not deep_buf[first_i][buf_index]:
del read_its[first_i]
del tStarts[first_i]
del deep_buf[first_i]
del buf_indices[first_i]
else:
tStarts[first_i] = deep_buf[first_i][buf_index].tStart
yield first
else:
read_its = [iter(rr.readsInRange(refName, start, end))
for rr in self.resourceReaders()]
# buffer one element from each generator
currents = [next(its, None) for its in read_its]
# remove empty iterators
read_its = [it for it, cur in zip(read_its, currents) if cur]
currents = [cur for cur in currents if cur]
tStarts = [cur.tStart for cur in currents]
while len(read_its) != 0:
# pick the first one to yield
# this should be a bit faster than taking the min of an
# enumeration of currents with a key function accessing a
# field...
first = min(tStarts)
first_i = tStarts.index(first)
first = currents[first_i]
# update the buffers
try:
currents[first_i] = next(read_its[first_i])
tStarts[first_i] = currents[first_i].tStart
except StopIteration:
del read_its[first_i]
del currents[first_i]
del tStarts[first_i]
yield first
else:
# the above will work in either case, but this might be ever so
# slightly faster
for resource in self.resourceReaders():
for read in resource.readsInRange(refName, start, end):
yield read
@property
def tId(self):
if self.isCmpH5:
return self.index.RefGroupID
return self.index.tId
@property
def mapQV(self):
if self.isCmpH5:
return self.index.MapQV
return self.index.mapQV
@property
def isSorted(self):
return self._checkIdentical('isSorted')
@property
def tStart(self):
return self._checkIdentical('tStart')
@property
def tEnd(self):
return self._checkIdentical('tEnd')
@property
def _length(self):
"""Used to populate metadata in updateCounts. We're using the pbi here,
which is necessary and sufficient for both subreadsets and
alignmentsets, but doesn't work for hdfsubreadsets. Rather than
duplicate code, we'll implement the hdf specific _length as an
overriding function where needed.
..note:: Both mapped and unmapped bams can be either indexed or
unindexed. This makes life more difficult, but we should
expect a pbi for both subreadsets and alignmentsets
"""
count = len(self.index)
length = 0
if count:
if self.isCmpH5:
length = sum(self.index.rEnd - self.index.rStart)
else:
try:
length = sum(self.index.aEnd - self.index.aStart)
except AttributeError:
# If the bam is empty or the file is not actually aligned,
# this field wont be populated
if self.isMapped:
log.debug(".pbi mapping columns missing from mapped "
"bam, bam may be empty")
else:
log.warn("File not actually mapped!")
length = 0
return count, length
@property
def _referenceFile(self):
responses = [res.reference for res in self.externalResources]
return self._unifyResponses(responses)
@property
def recordsByReference(self):
"""The records in this AlignmentSet, sorted by tStart."""
# we only care about aligned sequences here, so we can make this a
# chain of readsInReferences to add pre-filtering by rname, instead of
# going through every record and performing downstream filtering.
# This will make certain operations, like len(), potentially faster
for rname in self.refNames:
for read in self.readsInReference(rname):
yield read
def referenceInfo(self, refName):
"""Select a row from the DataSet.referenceInfoTable using the reference
name as a unique key (or ID, if you really have to)"""
# Convert it to a name if you have to:
if not isinstance(refName, basestring):
refName = str(refName)
if refName.isdigit():
if not refName in self.refNames:
refName = self._idToRname(int(refName))
tbr = self.referenceInfoTable[
self.referenceInfoTable['Name'] == refName]
if len(tbr) > 1:
log.info("Duplicate reference names detected")
elif len(tbr) == 1:
return tbr[0]
else:
log.debug("No reference found by that Name or ID")
def _buildRefInfoTable(self, filterMissing=True):
self._referenceInfoTableIsStacked = False
responses = []
# allow for merge here:
for res in self._pollResources(lambda x: x.referenceInfoTable):
if not res is None:
if self.isCmpH5:
for rec in res:
rec.StartRow = 0
rec.EndRow = 0
responses.append(res)
table = []
if len(responses) > 1:
# perhaps this can be removed someday so that cmpH5 references
# are 0-indexed even if only one exists
try:
# this works even for cmp's, because we overwrite the
# highly variable fields above
table = self._unifyResponses(
responses,
eqFunc=np.array_equal)
except ResourceMismatchError:
table = np.concatenate(responses)
table = np.unique(table)
for i, rec in enumerate(table):
rec.ID = i
rec.RefInfoID = i
self._referenceInfoTableIsStacked = True
elif len(responses) == 1:
table = responses[0]
else:
raise InvalidDataSetIOError("No reference tables found, "
"are these input files aligned?")
if self.isCmpH5:
for rec in table:
rec.Name = self._cleanCmpName(rec.FullName)
# Not sure if this is necessary or a good idea, looking back on it
# a year later. It complicates fixTids
if filterMissing:
log.debug("Filtering reference entries")
if not self.noFiltering and self._filters:
passes = self._filters.testField('rname', table['Name'])
table = table[passes]
#TODO: Turn on when needed (expensive)
#self._referenceDict.update(zip(self.refIds.values(),
# self._referenceInfoTable))
return table
@property
def referenceInfoTable(self):
"""The merged reference info tables from the external resources.
Record.ID is remapped to a unique integer key (though using record.Name
is preferred). Record.Names are remapped for cmp.h5 files to be
consistent with bam files.
..note:: Reference names are assumed to be unique
"""
if self._referenceInfoTable is None:
self._referenceInfoTable = self._buildRefInfoTable(
filterMissing=True)
return self._referenceInfoTable
@property
def _stackedReferenceInfoTable(self):
if self._referenceInfoTableIsStacked is None:
_ = self.referenceInfoTable
return self._referenceInfoTableIsStacked
@property
def _referenceIdMap(self):
"""Map the dataset shifted refIds to the [resource, refId] they came
from.
"""
if self.__referenceIdMap is None:
self.__referenceIdMap = {ref.ID: ref.Name
for ref in self.referenceInfoTable}
return self.__referenceIdMap
def _cleanCmpName(self, name):
return splitFastaHeader(name)[0]
@property
def refLengths(self):
"""A dict of refName: refLength"""
return {name: length for name, length in self.refInfo('Length')}
@property
def refIds(self):
"""A dict of refName: refId for the joined referenceInfoTable
TODO: depricate in favor of more descriptive rname2tid"""
return self.rname2tid
@property
def rname2tid(self):
"""A dict of refName: refId for the joined referenceInfoTable"""
return {name: rId for name, rId in self.refInfo('ID')}
@property
def tid2rname(self):
"""A dict of refName: refId for the joined referenceInfoTable"""
return {rId: name for name, rId in self.refInfo('ID')}
def refLength(self, rname):
"""The length of reference 'rname'. This is expensive, so if you're
going to do many lookups cache self.refLengths locally and use that."""
lut = self.refLengths
return lut[rname]
@property
def fullRefNames(self):
"""A list of reference full names (full header)."""
return [name for _, name in self.refInfo('FullName')]
def refInfo(self, key):
"""Return a column in the referenceInfoTable, tupled with the reference
name. TODO(mdsmith)(2016-01-27): pick a better name for this method...
"""
return zip(self.referenceInfoTable['Name'],
self.referenceInfoTable[key])
def _idToRname(self, rId):
"""Map the DataSet.referenceInfoTable.ID to the superior unique
reference identifier: referenceInfoTable.Name
Args:
:rId: The DataSet.referenceInfoTable.ID of interest
Returns:
The referenceInfoTable.Name corresponding to rId
"""
return self._referenceIdMap[rId]
@staticmethod
def _metaTypeMapping():
# This doesn't work for scraps.bam, whenever that is implemented
return {'bam':'PacBio.AlignmentFile.AlignmentBamFile',
'bai':'PacBio.Index.BamIndex',
'pbi':'PacBio.Index.PacBioIndex',
'cmp.h5':'PacBio.AlignmentFile.AlignmentCmpH5File',
}
class ConsensusReadSet(ReadSet):
"""DataSet type specific to CCSreads. No type specific Metadata exists, so
the base class version is OK (this just ensures type representation on
output and expandability
Doctest:
>>> import pbcore.data.datasets as data
>>> from pbcore.io import ConsensusReadSet
>>> ds2 = ConsensusReadSet(data.getXml(2), strict=False,
... skipMissing=True)
>>> ds2 # doctest:+ELLIPSIS
<ConsensusReadSet...
>>> ds2._metadata # doctest:+ELLIPSIS
<SubreadSetMetadata...
"""
datasetType = DataSetMetaTypes.CCS
@staticmethod
def _metaTypeMapping():
# This doesn't work for scraps.bam, whenever that is implemented
return {'bam':'PacBio.ConsensusReadFile.ConsensusReadBamFile',
'bai':'PacBio.Index.BamIndex',
'pbi':'PacBio.Index.PacBioIndex',
}
class ConsensusAlignmentSet(AlignmentSet):
"""
Dataset type for aligned CCS reads. Essentially identical to AlignmentSet
aside from the contents of the underlying BAM files.
"""
datasetType = DataSetMetaTypes.CCS_ALIGNMENT
@staticmethod
def _metaTypeMapping():
# This doesn't work for scraps.bam, whenever that is implemented
return {'bam':'PacBio.ConsensusReadFile.ConsensusReadBamFile',
'bai':'PacBio.Index.BamIndex',
'pbi':'PacBio.Index.PacBioIndex',
}
class TranscriptSet(ReadSet):
"""
DataSet type for processed RNA transcripts in BAM format. These are not
technically "reads", but they share many of the same properties and are
therefore handled the same way.
"""
datasetType = DataSetMetaTypes.TRANSCRIPT
@staticmethod
def _metaTypeMapping():
# This doesn't work for scraps.bam, whenever that is implemented
return {'bam':'PacBio.TranscriptFile.TranscriptBamFile',
'bai':'PacBio.Index.BamIndex',
'pbi':'PacBio.Index.PacBioIndex',
}
class TranscriptAlignmentSet(AlignmentSet):
"""
Dataset type for aligned RNA transcripts. Essentially identical to
AlignmentSet aside from the contents of the underlying BAM files.
"""
datasetType = DataSetMetaTypes.TRANSCRIPT_ALIGNMENT
@staticmethod
def _metaTypeMapping():
# This doesn't work for scraps.bam, whenever that is implemented
return {'bam':'PacBio.AlignmentFile.TranscriptAlignmentBamFile',
'bai':'PacBio.Index.BamIndex',
'pbi':'PacBio.Index.PacBioIndex',
}
class ContigSet(DataSet):
"""DataSet type specific to Contigs"""
datasetType = DataSetMetaTypes.CONTIG
def __init__(self, *files, **kwargs):
self._fastq = False
super(ContigSet, self).__init__(*files, **kwargs)
# weaken by permitting failure to allow BarcodeSet to have own
# Metadata type
try:
self._metadata = ContigSetMetadata(self._metadata)
self._updateMetadata()
except TypeError:
pass
def split(self, nchunks):
log.debug("Getting and dividing contig id's")
keys = self.index.id
chunks = divideKeys(keys, nchunks)
log.debug("Creating copies of the dataset")
results = [self.copy() for _ in range(nchunks)]
log.debug("Applying filters and updating counts")
for chunk, res in zip(chunks, results):
res._filters.addRequirement(id=[('=', n) for n in chunk])
res.newUuid()
log.debug("Updating new res counts:")
res.updateCounts()
return results
def consolidate(self, outfn=None, numFiles=1, useTmp=False):
"""Consolidation should be implemented for window text in names and
for filters in ContigSets"""
if numFiles != 1:
raise NotImplementedError(
"Only one output file implemented so far.")
# In general "name" here refers to the contig.id only, which is why we
# also have to keep track of comments.
log.debug("Beginning consolidation")
# Get the possible keys
names = self.contigNames
winless_names = [self._removeWindow(name) for name in names]
# Put them into buckets
matches = {}
for con in self.contigs:
conId = con.id
window = self._parseWindow(conId)
if not window is None:
conId = self._removeWindow(conId)
if conId not in matches:
matches[conId] = [con]
else:
matches[conId].append(con)
for name, match_list in matches.items():
matches[name] = np.array(match_list)
writeTemp = False
# consolidate multiple files into one
if len(self.toExternalFiles()) > 1:
writeTemp = True
if self._filters and not self.noFiltering:
writeTemp = True
if not writeTemp:
writeTemp = any([len(m) > 1 for n, m in matches.items()])
def _get_windows(match_list):
# look for the quiver window indication scheme from quiver:
windows = np.array([self._parseWindow(match.id)
for match in match_list])
for win in windows:
if win is None:
raise ValueError("Windows not found for all items with a "
"matching id, consolidation aborted")
return windows
for name, match_list in matches.items():
if len(match_list) > 1:
try:
windows = _get_windows(match_list)
except ValueError as e:
log.error(e)
return
def _get_merged_sequence(name):
match_list = matches.pop(name)
if len(match_list) > 1:
log.debug("Multiple matches found for {i}".format(i=name))
windows = _get_windows(match_list)
# order windows
order = np.argsort([window[0] for window in windows])
match_list = match_list[order]
windows = windows[order]
# TODO: check to make sure windows/lengths are compatible,
# complete
# collapse matches
new_name = self._removeWindow(name)
seq = ''.join([match.sequence[:] for match in match_list])
quality = None
if self._fastq:
quality = ''.join([match.qualityString for match in match_list])
return (seq, match_list[0].comment, quality)
else:
log.debug("One match found for {i}".format(i=name))
seq = match_list[0].sequence[:]
quality = None
if self._fastq:
quality = match_list[0].qualityString
return (seq, match_list[0].comment, quality)
if writeTemp:
log.debug("Writing a new file is necessary")
if not outfn:
log.debug("Writing to a temp directory as no path given")
outdir = tempfile.mkdtemp(suffix="consolidated-contigset")
if self._fastq:
outfn = os.path.join(outdir,
'consolidated_contigs.fastq')
else:
outfn = os.path.join(outdir,
'consolidated_contigs.fasta')
with self._writer(outfn) as outfile:
log.debug("Writing new resource {o}".format(o=outfn))
for name in list(matches.keys()):
seq, comments, quality = _get_merged_sequence(name)
if comments:
name = ' '.join([name, comments])
if self._fastq:
outfile.writeRecord(name, seq, qvsFromAscii(quality))
else:
outfile.writeRecord(name, seq)
if not self._fastq:
_indexFasta(outfn)
# replace resources
log.debug("Replacing resources")
self.externalResources = ExternalResources()
self.addExternalResources([outfn])
self._index = None
self._indexMap = None
self._openReaders = []
self._populateMetaTypes()
self.updateCounts()
else:
log.warn("No need to write a new resource file, using current "
"resource instead.")
self._populateMetaTypes()
@property
def _writer(self):
if self._fastq:
return FastqWriter
return FastaWriter
def _popSuffix(self, name):
"""Chunking and quivering adds suffixes to contig names, after the
normal ID and window. This complicates our dewindowing and
consolidation, so we'll remove them for now"""
observedSuffixes = ['|quiver', '|plurality', '|arrow', '|poa']
for suff in observedSuffixes:
if name.endswith(suff):
log.debug("Suffix found: {s}".format(s=suff))
return name.replace(suff, ''), suff
return name, ''
def _removeWindow(self, name):
"""Chunking and quivering appends a window to the contig ID, which
allows us to consolidate the contig chunks but also gets in the way of
contig identification by ID. Remove it temporarily"""
if isinstance(self._parseWindow(name), np.ndarray):
name, suff = self._popSuffix(name)
return '_'.join(name.split('_')[:-2]) + suff
return name
def induceIndices(self, force=False):
if not self.isIndexed:
for extRes in self.externalResources:
iname = extRes.resourceId + '.fai'
if not os.path.isfile(iname) or force:
iname = _indexFasta(extRes.resourceId)
extRes.addIndices([iname])
self.close()
self._populateMetaTypes()
self.updateCounts()
def _parseWindow(self, name):
"""Chunking and quivering appends a window to the contig ID, which
allows us to consolidate the contig chunks."""
name, _ = self._popSuffix(name)
if re.search("_\d+_\d+$", name) is None:
return None
possibilities = name.split('_')[-2:]
for pos in possibilities:
if not pos.isdigit():
return None
return np.array(map(int, possibilities))
def _updateMetadata(self):
# update contig specific metadata:
if not self._metadata.organism:
self._metadata.organism = ''
if not self._metadata.ploidy:
self._metadata.ploidy = ''
def updateCounts(self):
if self._skipCounts:
if not self.metadata.totalLength:
self.metadata.totalLength = -1
if not self.metadata.numRecords:
self.metadata.numRecords = -1
return
if not self.isIndexed:
if (not self.totalLength and not self.numRecords and not
self._countsUpdated):
log.info("Cannot updateCounts without an index file")
self.metadata.totalLength = 0
self.metadata.numRecords = 0
return
try:
log.debug('Updating counts')
self.metadata.totalLength = sum(self.index.length)
self.metadata.numRecords = len(self.index)
self._countsUpdated = True
except (IOError, UnavailableFeature, TypeError):
# IOError for missing files
# UnavailableFeature for files without companion files
# TypeError for FastaReader, which doesn't have a len()
if not self._strict:
self.metadata.totalLength = -1
self.metadata.numRecords = -1
else:
raise
def addMetadata(self, newMetadata, **kwargs):
"""Add metadata specific to this subtype, while leaning on the
superclass method for generic metadata. Also enforce metadata type
correctness."""
# Validate, clean and prep input data
if newMetadata:
if isinstance(newMetadata, dict):
newMetadata = ContigSetMetadata(newMetadata)
elif isinstance(newMetadata, ContigSetMetadata) or (
type(newMetadata).__name__ == 'DataSetMetadata'):
newMetadata = ContigSetMetadata(newMetadata.record)
else:
raise TypeError("Cannot extend ContigSetMetadata with "
"{t}".format(t=type(newMetadata).__name__))
# Pull generic values, kwargs, general treatment in super
super(ContigSet, self).addMetadata(newMetadata, **kwargs)
@property
def metadata(self):
if not isinstance(self._metadata, ContigSetMetadata):
self._metadata = ContigSetMetadata(self._metadata)
return self._metadata
@metadata.setter
def metadata(self, value):
if not isinstance(value, ContigSetMetadata):
value = ContigSetMetadata(value)
self._metadata = value
def _openFiles(self):
"""Open the files (assert they exist, assert they are of the proper
type before accessing any file)
"""
if self._openReaders:
log.debug("Closing old {t} readers".format(
t=self.__class__.__name__))
self.close()
log.debug("Opening {t} resources".format(
t=self.__class__.__name__))
for extRes in self.externalResources:
resource = self._openFile(urlparse(extRes.resourceId).path)
if resource is not None:
self._openReaders.append(resource)
if len(self._openReaders) == 0 and len(self.toExternalFiles()) != 0:
raise IOError("No files were openable")
log.debug("Done opening {t} resources".format(
t=self.__class__.__name__))
def _openFile(self, location):
resource = None
if location.endswith("fastq"):
self._fastq = True
resource = FastqReader(location)
else:
try:
resource = IndexedFastaReader(location)
except IOError:
if not self._strict:
log.debug('Companion reference index (.fai) missing. '
'Use "samtools faidx <refname>" to generate '
'one.')
resource = FastaReader(location)
else:
raise
except ValueError:
log.debug("Fasta file is empty")
# this seems to work for an emtpy fasta, interesting:
resource = FastaReader(location)
# we know this file is empty
self._skipCounts = True
self.metadata.totalLength = 0
self.metadata.numRecords = 0
return resource
def resourceReaders(self, refName=None):
"""A generator of fastaReader objects for the ExternalResources in this
ReferenceSet.
Yields:
An open fasta file
"""
if refName:
log.error("Specifying a contig name not yet implemented")
if not self._openReaders:
self._openFiles()
else:
for reader in self._openReaders:
if isinstance(reader, (FastaReader, FastqReader)):
reader.file = open(reader.filename, "r")
return self._openReaders
@property
@filtered
def contigs(self):
"""A generator of contigs from the fastaReader objects for the
ExternalResources in this ReferenceSet.
Yields:
A fasta file entry
"""
for resource in self.resourceReaders():
for contig in resource:
yield contig
def get_contig(self, contig_id):
"""Get a contig by ID"""
# TODO: have this use getitem for indexed fasta readers:
for contig in self.contigs:
if contig.id == contig_id or contig.name == contig_id:
return contig
def assertIndexed(self):
try:
self._assertIndexed(IndexedFastaReader)
except IOError:
raise IOError("Companion FASTA index (.fai) file not found or "
"malformatted! Use 'samtools faidx' to generate "
"FASTA index.")
return True
@property
def isIndexed(self):
try:
res = self._pollResources(
lambda x: isinstance(x, IndexedFastaReader))
return self._unifyResponses(res)
except ResourceMismatchError:
if not self._strict:
log.info("Not all resource are equally indexed.")
return False
else:
raise
except IndexError:
if not self._strict:
log.info("No resource readers!")
return False
else:
raise InvalidDataSetIOError("No openable resources!")
@property
def contigNames(self):
"""The names assigned to the External Resources, or contigs if no name
assigned."""
# TODO{mdsmith}{3/10/2016} Make this faster by using (optionally) the
# index file
names = []
for contig in self.contigs:
if self.noFiltering:
names.append(contig.id)
elif self._filters.testParam('id', contig.id, str):
names.append(contig.id)
return sorted(list(set(names)))
@staticmethod
def _metaTypeMapping():
return {'fasta':'PacBio.ContigFile.ContigFastaFile',
'fastq':'PacBio.ContigFile.ContigFastqFile',
'fa':'PacBio.ContigFile.ContigFastaFile',
'fas':'PacBio.ContigFile.ContigFastaFile',
'fai':'PacBio.Index.SamIndex',
'contig.index':'PacBio.Index.FastaContigIndex',
'index':'PacBio.Index.Indexer',
'sa':'PacBio.Index.SaWriterIndex',
}
def _indexRecords(self):
"""Returns index records summarizing all of the records in all of
the resources that conform to those filters addressing parameters
cached in the pbi.
"""
recArrays = []
_indexMap = []
for rrNum, rr in enumerate(self.resourceReaders()):
indices = rr.fai
if len(indices) == 0:
continue
# have to manually specify the dtype to make it work like a pbi
indices = np.rec.fromrecords(
indices,
dtype=[('id', 'O'), ('comment', 'O'), ('header', 'O'),
('length', '<i8'), ('offset', '<i8'),
('lineWidth', '<i8'), ('stride', '<i8')])
if not self._filters or self.noFiltering:
recArrays.append(indices)
_indexMap.extend([(rrNum, i) for i in
range(len(indices))])
else:
# Filtration will be necessary:
# dummy map, the id is the name in fasta space
nameMap = {name: name for name in indices.id}
passes = self._filters.filterIndexRecords(indices, nameMap, {},
readType='fasta')
newInds = indices[passes]
recArrays.append(newInds)
_indexMap.extend([(rrNum, i) for i in
np.flatnonzero(passes)])
self._indexMap = np.array(_indexMap, dtype=[('reader', 'uint64'),
('index', 'uint64')])
if len(recArrays) == 0:
recArrays = [np.array(
[],
dtype=[('id', 'O'), ('comment', 'O'), ('header', 'O'),
('length', '<i8'), ('offset', '<i8'),
('lineWidth', '<i8'), ('stride', '<i8')])]
return _stackRecArrays(recArrays)
def _filterType(self):
return 'fasta'
class ReferenceSet(ContigSet):
"""DataSet type specific to References"""
datasetType = DataSetMetaTypes.REFERENCE
def __init__(self, *files, **kwargs):
super(ReferenceSet, self).__init__(*files, **kwargs)
@property
def refNames(self):
"""The reference names assigned to the External Resources, or contigs
if no name assigned."""
return self.contigNames
@staticmethod
def _metaTypeMapping():
return {'fasta':'PacBio.ContigFile.ContigFastaFile',
'fa':'PacBio.ContigFile.ContigFastaFile',
'fas':'PacBio.ContigFile.ContigFastaFile',
'fai':'PacBio.Index.SamIndex',
'contig.index':'PacBio.Index.FastaContigIndex',
'index':'PacBio.Index.Indexer',
'sa':'PacBio.Index.SaWriterIndex',
}
class GmapReferenceSet(ReferenceSet):
"""DataSet type specific to GMAP References"""
datasetType = DataSetMetaTypes.GMAPREFERENCE
def __init__(self, *files, **kwargs):
super(GmapReferenceSet, self).__init__(*files, **kwargs)
@property
def gmap(self):
return self.externalResources[0].gmap
@gmap.setter
def gmap(self, value):
self.externalResources[0].gmap = value
@staticmethod
def _metaTypeMapping():
return {'fasta':'PacBio.ContigFile.ContigFastaFile',
'fa':'PacBio.ContigFile.ContigFastaFile',
'fas':'PacBio.ContigFile.ContigFastaFile',
'fai':'PacBio.Index.SamIndex',
'contig.index':'PacBio.Index.FastaContigIndex',
'index':'PacBio.Index.Indexer',
'sa':'PacBio.Index.SaWriterIndex',
'gmap': 'PacBio.GmapDB.GmapDBPath',
}
class BarcodeSet(ContigSet):
"""DataSet type specific to Barcodes"""
datasetType = DataSetMetaTypes.BARCODE
def __init__(self, *files, **kwargs):
super(BarcodeSet, self).__init__(*files, **kwargs)
self._metadata = BarcodeSetMetadata(self._metadata.record)
self._updateMetadata()
def addMetadata(self, newMetadata, **kwargs):
"""Add metadata specific to this subtype, while leaning on the
superclass method for generic metadata. Also enforce metadata type
correctness."""
# Validate, clean and prep input data
if newMetadata:
if isinstance(newMetadata, dict):
newMetadata = BarcodeSetMetadata(newMetadata)
elif isinstance(newMetadata, BarcodeSetMetadata) or (
type(newMetadata).__name__ == 'DataSetMetadata'):
newMetadata = BarcodeSetMetadata(newMetadata.record)
else:
raise TypeError("Cannot extend BarcodeSetMetadata with "
"{t}".format(t=type(newMetadata).__name__))
# Pull generic values, kwargs, general treatment in super
super(BarcodeSet, self).addMetadata(newMetadata, **kwargs)
# Pull subtype specific values where important
# -> No type specific merging necessary, for now
@property
def metadata(self):
if not isinstance(self._metadata, BarcodeSetMetadata):
self._metadata = BarcodeSetMetadata(self._metadata)
return self._metadata
@metadata.setter
def metadata(self, value):
if not isinstance(value, BarcodeSetMetadata):
value = BarcodeSetMetadata(value)
self._metadata = value
def _updateMetadata(self):
# update barcode specific metadata:
if not self._metadata.barcodeConstruction:
self._metadata.barcodeConstruction = ''
@staticmethod
def _metaTypeMapping():
return {'fasta':'PacBio.BarcodeFile.BarcodeFastaFile',
'fai':'PacBio.Index.SamIndex',
'sa':'PacBio.Index.SaWriterIndex',
}
|