1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577
|
import os
import sys
from math import modf
from errno import EOPNOTSUPP
try:
from errno import ENOTSUP
except ImportError:
# some Pythons don't have errno.ENOTSUP
ENOTSUP = 0
from rpython.rlib import rposix, rposix_stat, rfile
from rpython.rlib import objectmodel, rurandom
from rpython.rlib.objectmodel import specialize, not_rpython
from rpython.rlib.rarithmetic import r_longlong, intmask, r_uint, r_int
from rpython.rlib.unroll import unrolling_iterable
from rpython.rtyper.lltypesystem import lltype
from rpython.tool.sourcetools import func_with_new_name
from pypy.interpreter.gateway import unwrap_spec, WrappedDefault, Unwrapper
from pypy.interpreter.error import (
OperationError, oefmt, wrap_oserror, wrap_oserror2, strerror as _strerror,
exception_from_saved_errno)
from pypy.interpreter.executioncontext import ExecutionContext
_WIN32 = sys.platform == 'win32'
if _WIN32:
from rpython.rlib import rwin32
c_int = "c_int"
# CPython 2.7 semantics used to be too messy, differing on 32-bit vs
# 64-bit, but this was cleaned up in recent 2.7.x. Now, any function
# taking a uid_t or gid_t accepts numbers in range(-1, 2**32) as an
# r_uint, with -1 being equivalent to 2**32-1. Any function that
# returns a uid_t or gid_t returns either an int or a long, depending
# on whether it fits or not, but always positive.
c_uid_t = 'c_uid_t'
c_gid_t = 'c_uid_t'
def wrap_uid(space, uid):
if uid <= r_uint(sys.maxint):
return space.newint(intmask(uid))
else:
return space.newint(uid) # an unsigned number
wrap_gid = wrap_uid
class FileEncoder(object):
is_unicode = True
def __init__(self, space, w_obj):
self.space = space
self.w_obj = w_obj
def as_bytes(self):
return self.space.fsencode_w(self.w_obj)
def as_unicode(self):
return self.space.unicode0_w(self.w_obj)
class FileDecoder(object):
is_unicode = False
def __init__(self, space, w_obj):
self.space = space
self.w_obj = w_obj
def as_bytes(self):
return self.space.bytesbuf0_w(self.w_obj)
def as_unicode(self):
return self.space.fsdecode_w(self.w_obj)
@specialize.memo()
def make_dispatch_function(func, tag, allow_fd_fn=None):
def dispatch(space, w_fname, *args):
if allow_fd_fn is not None:
try:
fd = space.c_int_w(w_fname)
except OperationError:
pass
else:
return allow_fd_fn(fd, *args)
if space.isinstance_w(w_fname, space.w_unicode):
fname = FileEncoder(space, w_fname)
return func(fname, *args)
else:
fname = space.bytesbuf0_w(w_fname)
return func(fname, *args)
return dispatch
@specialize.arg(0, 1)
def dispatch_filename(func, tag=0, allow_fd_fn=None):
return make_dispatch_function(func, tag, allow_fd_fn)
@specialize.memo()
def dispatch_filename_2(func):
def dispatch(space, w_fname1, w_fname2, *args):
if space.isinstance_w(w_fname1, space.w_unicode):
fname1 = FileEncoder(space, w_fname1)
if space.isinstance_w(w_fname2, space.w_unicode):
fname2 = FileEncoder(space, w_fname2)
return func(fname1, fname2, *args)
else:
fname2 = FileDecoder(space, w_fname2)
return func(fname1, fname2, *args)
else:
fname1 = FileDecoder(space, w_fname1)
if space.isinstance_w(w_fname2, space.w_unicode):
fname2 = FileEncoder(space, w_fname2)
return func(fname1, fname2, *args)
else:
fname2 = FileDecoder(space, w_fname2)
return func(fname1, fname2, *args)
return dispatch
@specialize.arg(0)
def call_rposix(func, path, *args):
"""Call a function that takes a filesystem path as its first argument"""
if path.as_unicode is not None:
return func(path.as_unicode, *args)
else:
path_b = path.as_bytes
assert path_b is not None
return func(path_b, *args)
class Path(object):
_immutable_fields_ = ['as_fd', 'as_bytes', 'as_unicode', 'w_path']
def __init__(self, fd, bytes, unicode, w_path):
self.as_fd = fd
self.as_bytes = bytes
self.as_unicode = unicode
self.w_path = w_path
@specialize.arg(2)
def _unwrap_path(space, w_value, allow_fd=True):
if space.is_none(w_value):
raise oefmt(space.w_TypeError,
"can't specify None for path argument")
if _WIN32:
try:
path_u = space.unicode0_w(w_value)
return Path(-1, None, path_u, w_value)
except OperationError:
pass
try:
path_b = space.fsencode_w(w_value)
return Path(-1, path_b, None, w_value)
except OperationError as e:
if not e.match(space, space.w_TypeError):
raise
if allow_fd:
fd = unwrap_fd(space, w_value, "string, bytes or integer")
return Path(fd, None, None, w_value)
raise oefmt(space.w_TypeError,
"illegal type for path parameter (expected "
"string or bytes, got %T)", w_value)
class _PathOrFd(Unwrapper):
def unwrap(self, space, w_value):
return _unwrap_path(space, w_value, allow_fd=True)
class _JustPath(Unwrapper):
def unwrap(self, space, w_value):
return _unwrap_path(space, w_value, allow_fd=False)
def path_or_fd(allow_fd=True):
return _PathOrFd if allow_fd else _JustPath
_HAVE_AT_FDCWD = getattr(rposix, 'AT_FDCWD', None) is not None
DEFAULT_DIR_FD = rposix.AT_FDCWD if _HAVE_AT_FDCWD else -100
DIR_FD_AVAILABLE = False
@specialize.arg(2)
def unwrap_fd(space, w_value, allowed_types='integer'):
try:
result = space.c_int_w(w_value)
except OperationError as e:
if not e.match(space, space.w_OverflowError):
raise oefmt(space.w_TypeError,
"argument should be %s, not %T", allowed_types, w_value)
else:
raise
if result == -1:
# -1 is used as sentinel value for not a fd
raise oefmt(space.w_ValueError, "invalid file descriptor: -1")
return result
def _unwrap_dirfd(space, w_value):
if space.is_none(w_value):
return DEFAULT_DIR_FD
else:
return unwrap_fd(space, w_value)
class _DirFD(Unwrapper):
def unwrap(self, space, w_value):
return _unwrap_dirfd(space, w_value)
class _DirFD_Unavailable(Unwrapper):
def unwrap(self, space, w_value):
dir_fd = _unwrap_dirfd(space, w_value)
if dir_fd == DEFAULT_DIR_FD:
return dir_fd
raise oefmt(space.w_NotImplementedError,
"dir_fd unavailable on this platform")
def DirFD(available=False):
return _DirFD if available else _DirFD_Unavailable
@specialize.arg(1, 2)
def argument_unavailable(space, funcname, arg):
return oefmt(
space.w_NotImplementedError,
"%s: %s unavailable on this platform", funcname, arg)
_open_inhcache = rposix.SetNonInheritableCache()
@unwrap_spec(flags=c_int, mode=c_int, dir_fd=DirFD(rposix.HAVE_OPENAT))
def open(space, w_path, flags, mode=0777,
__kwonly__=None, dir_fd=DEFAULT_DIR_FD):
"""open(path, flags, mode=0o777, *, dir_fd=None)
Open a file for low level IO. Returns a file handle (integer).
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
if rposix.O_CLOEXEC is not None:
flags |= rposix.O_CLOEXEC
while True:
try:
if rposix.HAVE_OPENAT and dir_fd != DEFAULT_DIR_FD:
path = space.fsencode_w(w_path)
fd = rposix.openat(path, flags, mode, dir_fd)
else:
fd = dispatch_filename(rposix.open)(space, w_path, flags, mode)
break
except OSError as e:
wrap_oserror2(space, e, w_path, eintr_retry=True)
try:
_open_inhcache.set_non_inheritable(fd)
except OSError as e:
rposix.c_close(fd)
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
return space.newint(fd)
@unwrap_spec(fd=c_int, position=r_longlong, how=c_int)
def lseek(space, fd, position, how):
"""Set the current position of a file descriptor. Return the new position.
If how == 0, 'position' is relative to the start of the file; if how == 1, to
the current position; if how == 2, to the end."""
try:
pos = os.lseek(fd, position, how)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
return space.newint(pos)
@unwrap_spec(fd=c_int)
def isatty(space, fd):
"""Return True if 'fd' is an open file descriptor connected to the
slave end of a terminal."""
try:
res = os.isatty(fd)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
return space.newbool(res)
@unwrap_spec(fd=c_int, length=int)
def read(space, fd, length):
"""Read data from a file descriptor."""
while True:
try:
s = os.read(fd, length)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newbytes(s)
@unwrap_spec(fd=c_int)
def write(space, fd, w_data):
"""Write a string to a file descriptor. Return the number of bytes
actually written, which may be smaller than len(data)."""
data = space.charbuf_w(w_data)
while True:
try:
res = os.write(fd, data)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newint(res)
@unwrap_spec(fd=c_int)
def close(space, fd):
"""Close a file descriptor (for low level IO)."""
# PEP 475 note: os.close() must not retry upon EINTR. Like in
# previous versions of Python it raises OSError in this case.
# The text of PEP 475 seems to suggest that EINTR is eaten and
# hidden from app-level, but it is not the case in CPython 3.5.2.
try:
os.close(fd)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(fd_low=c_int, fd_high=c_int)
def closerange(fd_low, fd_high):
"""Closes all file descriptors in [fd_low, fd_high), ignoring errors."""
rposix.closerange(fd_low, fd_high)
@unwrap_spec(fd=c_int, length=r_longlong)
def ftruncate(space, fd, length):
"""Truncate a file (by file descriptor) to a specified length."""
while True:
try:
os.ftruncate(fd, length)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
def truncate(space, w_path, w_length):
"""Truncate a file to a specified length."""
allocated_fd = False
fd = -1
try:
if space.isinstance_w(w_path, space.w_int):
w_fd = w_path
else:
w_fd = open(space, w_path, os.O_WRONLY)
allocated_fd = True
fd = space.c_filedescriptor_w(w_fd)
length = space.int_w(w_length)
return ftruncate(space, fd, length)
finally:
if allocated_fd and fd != -1:
close(space, fd)
def fsync(space, w_fd):
"""Force write of file with filedescriptor to disk."""
fd = space.c_filedescriptor_w(w_fd)
while True:
try:
os.fsync(fd)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
def fdatasync(space, w_fd):
"""Force write of file with filedescriptor to disk.
Does not force update of metadata."""
fd = space.c_filedescriptor_w(w_fd)
while True:
try:
os.fdatasync(fd)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
def sync(space):
"""Force write of everything to disk."""
rposix.sync()
def fchdir(space, w_fd):
"""Change to the directory of the given file descriptor. fildes must be
opened on a directory, not a file."""
fd = space.c_filedescriptor_w(w_fd)
while True:
try:
os.fchdir(fd)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
@unwrap_spec(fd=c_int, length=int, offset=r_longlong)
def pread(space, fd, length, offset):
"""Read a string to a file descriptor at a given offset.
"""
while True:
try:
s = rposix.pread(fd, length, offset)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newbytes(s)
@unwrap_spec(fd=c_int, offset=r_longlong)
def pwrite(space, fd, w_data, offset):
"""Write a string to a file descriptor at a given offset.
"""
data = space.charbuf_w(w_data)
while True:
try:
res = rposix.pwrite(fd, data, offset)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newint(res)
@unwrap_spec(fd=c_int, length=r_longlong, offset=r_longlong)
def posix_fallocate(space, fd, offset, length):
"""allocate file space .
"""
while True:
try:
s = rposix.posix_fallocate(fd, offset, length)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newint(s)
@unwrap_spec(fd=c_int, offset=r_longlong, length=r_longlong, advice=int)
def posix_fadvise(space, fd, offset, length, advice):
"""predeclare an access pattern for file data .
"""
while True:
try:
rposix.posix_fadvise(fd, offset, length, advice)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return
# ____________________________________________________________
STAT_FIELDS = unrolling_iterable(enumerate(rposix_stat.STAT_FIELDS))
STATVFS_FIELDS = unrolling_iterable(enumerate(rposix_stat.STATVFS_FIELDS))
def build_stat_result(space, st):
FIELDS = STAT_FIELDS # also when not translating at all
lst = [None] * rposix_stat.N_INDEXABLE_FIELDS
w_keywords = space.newdict()
stat_float_times = space.fromcache(StatState).stat_float_times
for i, (name, TYPE) in FIELDS:
if i < rposix_stat.N_INDEXABLE_FIELDS:
# get the first 10 items by indexing; this gives us
# 'st_Xtime' as an integer, too
w_value = space.newint(st[i])
lst[i] = w_value
else:
try:
value = getattr(st, name)
except AttributeError:
# untranslated, there is no nsec_Xtime attribute
assert name.startswith('nsec_')
value = rposix_stat.get_stat_ns_as_bigint(st, name[5:])
value = value.tolong() % 1000000000
w_value = space.newint(value)
space.setitem(w_keywords, space.newtext(name), w_value)
# Note: 'w_keywords' contains the three attributes 'nsec_Xtime'.
# We have an app-level property in app_posix.stat_result to
# compute the full 'st_Xtime_ns' value.
# non-rounded values for name-based access
if stat_float_times:
space.setitem(w_keywords,
space.newtext('st_atime'), space.newfloat(st.st_atime))
space.setitem(w_keywords,
space.newtext('st_mtime'), space.newfloat(st.st_mtime))
space.setitem(w_keywords,
space.newtext('st_ctime'), space.newfloat(st.st_ctime))
#else:
# filled by the __init__ method
w_tuple = space.newtuple(lst)
w_stat_result = space.getattr(space.getbuiltinmodule(os.name),
space.newtext('stat_result'))
return space.call_function(w_stat_result, w_tuple, w_keywords)
def build_statvfs_result(space, st):
vals_w = [None] * len(rposix_stat.STATVFS_FIELDS)
for i, (name, _) in STATVFS_FIELDS:
vals_w[i] = space.newint(getattr(st, name))
w_tuple = space.newtuple(vals_w)
w_statvfs_result = space.getattr(
space.getbuiltinmodule(os.name), space.newtext('statvfs_result'))
return space.call_function(w_statvfs_result, w_tuple)
@unwrap_spec(fd=c_int)
def fstat(space, fd):
"""Perform a stat system call on the file referenced to by an open
file descriptor."""
while True:
try:
st = rposix_stat.fstat(fd)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return build_stat_result(space, st)
@unwrap_spec(
path=path_or_fd(allow_fd=True),
dir_fd=DirFD(rposix.HAVE_FSTATAT),
follow_symlinks=bool)
def stat(space, path, __kwonly__, dir_fd=DEFAULT_DIR_FD, follow_symlinks=True):
"""stat(path, *, dir_fd=None, follow_symlinks=True) -> stat result
Perform a stat system call on the given path.
path may be specified as either a string or as an open file descriptor.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be supported on your platform; if it is unavailable, using
it will raise a NotImplementedError.
If follow_symlinks is False, and the last element of the path is a symbolic
link, stat will examine the symbolic link itself instead of the file the
link points to.
It is an error to use dir_fd or follow_symlinks when specifying path as
an open file descriptor."""
return do_stat(space, "stat", path, dir_fd, follow_symlinks)
@specialize.arg(1)
def do_stat(space, funcname, path, dir_fd, follow_symlinks):
"""Common implementation for stat() and lstat()"""
try:
if path.as_fd != -1:
if dir_fd != DEFAULT_DIR_FD:
raise oefmt(space.w_ValueError,
"%s: can't specify both dir_fd and fd", funcname)
if not follow_symlinks:
raise oefmt(space.w_ValueError,
"%s: cannot use fd and follow_symlinks together", funcname)
st = rposix_stat.fstat(path.as_fd)
elif follow_symlinks and dir_fd == DEFAULT_DIR_FD:
st = call_rposix(rposix_stat.stat3, path)
elif not follow_symlinks and dir_fd == DEFAULT_DIR_FD:
st = call_rposix(rposix_stat.lstat3, path)
elif rposix.HAVE_FSTATAT:
st = call_rposix(rposix_stat.fstatat, path, dir_fd, follow_symlinks)
else:
raise oefmt(space.w_NotImplementedError,
"%s: unsupported argument combination", funcname)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path, eintr_retry=False)
else:
return build_stat_result(space, st)
@unwrap_spec(
path=path_or_fd(allow_fd=False),
dir_fd=DirFD(rposix.HAVE_FSTATAT))
def lstat(space, path, __kwonly__, dir_fd=DEFAULT_DIR_FD):
"""lstat(path, *, dir_fd=None) -> stat result
Like stat(), but do not follow symbolic links.
Equivalent to stat(path, follow_symlinks=False)."""
return do_stat(space, "lstat", path, dir_fd, False)
class StatState(object):
def __init__(self, space):
self.stat_float_times = True
@unwrap_spec(newval=int)
def stat_float_times(space, newval=-1):
"""stat_float_times([newval]) -> oldval
Determine whether os.[lf]stat represents time stamps as float objects.
If newval is True, future calls to stat() return floats, if it is False,
future calls return ints.
If newval is omitted, return the current setting.
"""
state = space.fromcache(StatState)
if newval == -1:
return space.newbool(state.stat_float_times)
else:
state.stat_float_times = (newval != 0)
@unwrap_spec(fd=c_int)
def fstatvfs(space, fd):
while True:
try:
st = rposix_stat.fstatvfs(fd)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return build_statvfs_result(space, st)
def statvfs(space, w_path):
"""statvfs(path)
Perform a statvfs system call on the given path.
path may always be specified as a string.
On some platforms, path may also be specified as an open file descriptor.
If this functionality is unavailable, using it raises an exception."""
try:
st = dispatch_filename(
rposix_stat.statvfs,
allow_fd_fn=rposix_stat.fstatvfs)(space, w_path)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
else:
return build_statvfs_result(space, st)
@unwrap_spec(fd=c_int)
def dup(space, fd):
"""Create a copy of the file descriptor. Return the new file
descriptor."""
try:
newfd = rposix.dup(fd, inheritable=False)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
return space.newint(newfd)
@unwrap_spec(fd=c_int, fd2=c_int, inheritable=bool)
def dup2(space, fd, fd2, inheritable=1):
"""Duplicate a file descriptor."""
# like os.close(), this can still raise EINTR to app-level in
# CPython 3.5.2
try:
rposix.dup2(fd, fd2, inheritable)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(mode=c_int,
dir_fd=DirFD(rposix.HAVE_FACCESSAT), effective_ids=bool,
follow_symlinks=bool)
def access(space, w_path, mode, __kwonly__,
dir_fd=DEFAULT_DIR_FD, effective_ids=False, follow_symlinks=True):
"""\
access(path, mode, *, dir_fd=None, effective_ids=False, follow_symlinks=True)
Use the real uid/gid to test for access to a path. Returns True if granted,
False otherwise.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
If effective_ids is True, access will use the effective uid/gid instead of
the real uid/gid.
If follow_symlinks is False, and the last element of the path is a symbolic
link, access will examine the symbolic link itself instead of the file the
link points to.
dir_fd, effective_ids, and follow_symlinks may not be implemented
on your platform. If they are unavailable, using them will raise a
NotImplementedError.
Note that most operations will use the effective uid/gid, therefore this
routine can be used in a suid/sgid environment to test if the invoking user
has the specified access to the path.
The mode argument can be F_OK to test existence, or the inclusive-OR
of R_OK, W_OK, and X_OK."""
if not rposix.HAVE_FACCESSAT:
if not follow_symlinks:
raise argument_unavailable(space, "access", "follow_symlinks")
if effective_ids:
raise argument_unavailable(space, "access", "effective_ids")
try:
if (rposix.HAVE_FACCESSAT and
(dir_fd != DEFAULT_DIR_FD or not follow_symlinks or
effective_ids)):
path = space.fsencode_w(w_path)
ok = rposix.faccessat(path, mode,
dir_fd, effective_ids, follow_symlinks)
else:
ok = dispatch_filename(rposix.access)(space, w_path, mode)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
else:
return space.newbool(ok)
def times(space):
"""
times() -> (utime, stime, cutime, cstime, elapsed_time)
Return a tuple of floating point numbers indicating process times.
"""
try:
times = os.times()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
w_keywords = space.newdict()
w_tuple = space.newtuple([space.newfloat(times[0]),
space.newfloat(times[1]),
space.newfloat(times[2]),
space.newfloat(times[3]),
space.newfloat(times[4])])
w_times_result = space.getattr(space.getbuiltinmodule(os.name),
space.newtext('times_result'))
return space.call_function(w_times_result, w_tuple, w_keywords)
@unwrap_spec(command='fsencode')
def system(space, command):
"""Execute the command (a string) in a subshell."""
try:
rc = os.system(command)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
return space.newint(rc)
@unwrap_spec(dir_fd=DirFD(rposix.HAVE_UNLINKAT))
def unlink(space, w_path, __kwonly__, dir_fd=DEFAULT_DIR_FD):
"""unlink(path, *, dir_fd=None)
Remove a file (same as remove()).
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
try:
if rposix.HAVE_UNLINKAT and dir_fd != DEFAULT_DIR_FD:
path = space.fsencode_w(w_path)
rposix.unlinkat(path, dir_fd, removedir=False)
else:
dispatch_filename(rposix.unlink)(space, w_path)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
@unwrap_spec(dir_fd=DirFD(rposix.HAVE_UNLINKAT))
def remove(space, w_path, __kwonly__, dir_fd=DEFAULT_DIR_FD):
"""remove(path, *, dir_fd=None)
Remove a file (same as unlink()).
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
try:
if rposix.HAVE_UNLINKAT and dir_fd != DEFAULT_DIR_FD:
path = space.fsencode_w(w_path)
rposix.unlinkat(path, dir_fd, removedir=False)
else:
dispatch_filename(rposix.unlink)(space, w_path)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
def _getfullpathname(space, w_path):
"""helper for ntpath.abspath """
try:
if space.isinstance_w(w_path, space.w_unicode):
path = FileEncoder(space, w_path)
fullpath = rposix.getfullpathname(path)
w_fullpath = space.newunicode(fullpath)
else:
path = space.bytesbuf0_w(w_path)
fullpath = rposix.getfullpathname(path)
w_fullpath = space.newbytes(fullpath)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
else:
return w_fullpath
def getcwdb(space):
"""Return the current working directory."""
try:
cur = os.getcwd()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
return space.newbytes(cur)
if _WIN32:
def getcwd(space):
"""Return the current working directory as a string."""
try:
cur = os.getcwdu()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
return space.newunicode(cur)
else:
def getcwd(space):
"""Return the current working directory as a string."""
return space.fsdecode(getcwdb(space))
def chdir(space, w_path):
"""Change the current working directory to the specified path."""
try:
if rposix.HAVE_FCHDIR:
dispatch_filename(rposix.chdir,
allow_fd_fn=os.fchdir)(space, w_path)
else:
dispatch_filename(rposix.chdir)(space, w_path)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
@unwrap_spec(mode=c_int, dir_fd=DirFD(rposix.HAVE_MKDIRAT))
def mkdir(space, w_path, mode=0o777, __kwonly__=None, dir_fd=DEFAULT_DIR_FD):
"""mkdir(path, mode=0o777, *, dir_fd=None)
Create a directory.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError.
The mode argument is ignored on Windows."""
try:
if rposix.HAVE_MKDIRAT and dir_fd != DEFAULT_DIR_FD:
path = space.fsencode_w(w_path)
rposix.mkdirat(path, mode, dir_fd)
else:
dispatch_filename(rposix.mkdir)(space, w_path, mode)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
@unwrap_spec(dir_fd=DirFD(rposix.HAVE_UNLINKAT))
def rmdir(space, w_path, __kwonly__, dir_fd=DEFAULT_DIR_FD):
"""rmdir(path, *, dir_fd=None)
Remove a directory.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
try:
if rposix.HAVE_UNLINKAT and dir_fd != DEFAULT_DIR_FD:
path = space.fsencode_w(w_path)
rposix.unlinkat(path, dir_fd, removedir=True)
else:
dispatch_filename(rposix.rmdir)(space, w_path)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
@unwrap_spec(code=c_int)
def strerror(space, code):
"""Translate an error code to a message string."""
try:
return space.newunicode(_strerror(code))
except ValueError:
raise oefmt(space.w_ValueError, "strerror() argument out of range")
def getlogin(space):
"""Return the currently logged in user."""
try:
cur = os.getlogin()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newfilename(cur)
# ____________________________________________________________
def getstatfields(space):
# for app_posix.py: export the list of 'st_xxx' names that we know
# about at RPython level
return space.newlist([space.newtext(name) for _, (name, _) in STAT_FIELDS])
class State:
def __init__(self, space):
self.space = space
self.w_environ = space.newdict()
self.random_context = rurandom.init_urandom()
def startup(self, space):
space.call_method(self.w_environ, 'clear')
_convertenviron(space, self.w_environ)
def _freeze_(self):
# don't capture the environment in the translated pypy
self.space.call_method(self.w_environ, 'clear')
# also reset random_context to a fresh new context (empty so far,
# to be filled at run-time by rurandom.urandom())
self.random_context = rurandom.init_urandom()
return True
def get(space):
return space.fromcache(State)
if _WIN32:
def _convertenviron(space, w_env):
# _wenviron must be initialized in this way if the program is
# started through main() instead of wmain()
rwin32._wgetenv(u"")
for key, value in rwin32._wenviron_items():
space.setitem(w_env, space.newunicode(key), space.newunicode(value))
@unwrap_spec(name=unicode, value=unicode)
def putenv(space, name, value):
"""Change or add an environment variable."""
# len includes space for '=' and a trailing NUL
if len(name) + len(value) + 2 > rwin32._MAX_ENV:
raise oefmt(space.w_ValueError,
"the environment variable is longer than %d "
"characters", rwin32._MAX_ENV)
try:
rwin32._wputenv(name, value)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
def _convertenviron(space, w_env):
for key, value in os.environ.items():
space.setitem(w_env, space.newbytes(key), space.newbytes(value))
def putenv(space, w_name, w_value):
"""Change or add an environment variable."""
try:
dispatch_filename_2(rposix.putenv)(space, w_name, w_value)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def unsetenv(space, w_name):
"""Delete an environment variable."""
try:
dispatch_filename(rposix.unsetenv)(space, w_name)
except KeyError:
pass
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def listdir(space, w_path=None):
"""listdir(path='.') -> list_of_filenames
Return a list containing the names of the files in the directory.
The list is in arbitrary order. It does not include the special
entries '.' and '..' even if they are present in the directory.
path can be specified as either str or bytes. If path is bytes,
the filenames returned will also be bytes; in all other circumstances
the filenames returned will be str.
On some platforms, path may also be specified as an open file descriptor;
the file descriptor must refer to a directory.
If this functionality is unavailable, using it raises NotImplementedError."""
if space.is_none(w_path):
w_path = space.newunicode(u".")
if space.isinstance_w(w_path, space.w_bytes):
# XXX CPython doesn't follow this path either if w_path is,
# for example, a memoryview or another buffer type
dirname = space.bytes0_w(w_path)
try:
result = rposix.listdir(dirname)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
return space.newlist_bytes(result)
try:
path = space.fsencode_w(w_path)
except OperationError as operr:
if operr.async(space):
raise
if not rposix.HAVE_FDOPENDIR:
raise oefmt(space.w_TypeError,
"listdir: illegal type for path argument")
fd = unwrap_fd(space, w_path, "string, bytes or integer")
try:
result = rposix.fdlistdir(os.dup(fd))
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
dirname = FileEncoder(space, w_path)
try:
result = rposix.listdir(dirname)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
len_result = len(result)
result_w = [None] * len_result
for i in range(len_result):
if _WIN32:
result_w[i] = space.newunicode(result[i])
else:
result_w[i] = space.newfilename(result[i])
return space.newlist(result_w)
@unwrap_spec(fd=c_int)
def get_inheritable(space, fd):
try:
return space.newbool(rposix.get_inheritable(fd))
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(fd=c_int, inheritable=int)
def set_inheritable(space, fd, inheritable):
try:
rposix.set_inheritable(fd, inheritable)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
_pipe_inhcache = rposix.SetNonInheritableCache()
def pipe(space):
"Create a pipe. Returns (read_end, write_end)."
try:
fd1, fd2 = rposix.pipe(rposix.O_CLOEXEC or 0)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
try:
_pipe_inhcache.set_non_inheritable(fd1)
_pipe_inhcache.set_non_inheritable(fd2)
except OSError as e:
rposix.c_close(fd2)
rposix.c_close(fd1)
raise wrap_oserror(space, e, eintr_retry=False)
return space.newtuple([space.newint(fd1), space.newint(fd2)])
@unwrap_spec(flags=c_int)
def pipe2(space, flags):
try:
fd1, fd2 = rposix.pipe2(flags)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newtuple([space.newint(fd1), space.newint(fd2)])
@unwrap_spec(mode=c_int, dir_fd=DirFD(rposix.HAVE_FCHMODAT),
follow_symlinks=bool)
def chmod(space, w_path, mode, __kwonly__,
dir_fd=DEFAULT_DIR_FD, follow_symlinks=True):
"""chmod(path, mode, *, dir_fd=None, follow_symlinks=True)
Change the access permissions of a file.
path may always be specified as a string.
On some platforms, path may also be specified as an open file descriptor.
If this functionality is unavailable, using it raises an exception.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
If follow_symlinks is False, and the last element of the path is a symbolic
link, chmod will modify the symbolic link itself instead of the file the
link points to.
It is an error to use dir_fd or follow_symlinks when specifying path as
an open file descriptor.
dir_fd and follow_symlinks may not be implemented on your platform.
If they are unavailable, using them will raise a NotImplementedError."""
if not rposix.HAVE_FCHMODAT:
if not follow_symlinks:
raise argument_unavailable(space, "chmod", "follow_symlinks")
while True:
try:
dispatch_filename(rposix.chmod)(space, w_path, mode)
return
except OSError as e:
wrap_oserror2(space, e, w_path, eintr_retry=True)
try:
path = space.fsencode_w(w_path)
except OperationError as operr:
if not space.isinstance_w(w_path, space.w_int):
raise oefmt(space.w_TypeError,
"argument should be string, bytes or integer, not %T", w_path)
fd = unwrap_fd(space, w_path)
# NB. in CPython 3.5.2, os.chmod(fd) propagates EINTR to app-level,
# but os.fchmod(fd) retries automatically. This might be fixed in
# more recent CPythons.
while True:
try:
os.fchmod(fd, mode)
return
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
while True:
try:
_chmod_path(path, mode, dir_fd, follow_symlinks)
break
except OSError as e:
if not follow_symlinks and e.errno in (ENOTSUP, EOPNOTSUPP):
# fchmodat() doesn't actually implement follow_symlinks=False
# so raise NotImplementedError in this case
raise argument_unavailable(space, "chmod", "follow_symlinks")
wrap_oserror2(space, e, w_path, eintr_retry=True)
def _chmod_path(path, mode, dir_fd, follow_symlinks):
if dir_fd != DEFAULT_DIR_FD or not follow_symlinks:
rposix.fchmodat(path, mode, dir_fd, follow_symlinks)
else:
rposix.chmod(path, mode)
@unwrap_spec(fd=c_int, mode=c_int)
def fchmod(space, fd, mode):
"""\
Change the access permissions of the file given by file descriptor fd.
"""
while True:
try:
os.fchmod(fd, mode)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
@unwrap_spec(src_dir_fd=DirFD(rposix.HAVE_RENAMEAT),
dst_dir_fd=DirFD(rposix.HAVE_RENAMEAT))
def rename(space, w_src, w_dst, __kwonly__,
src_dir_fd=DEFAULT_DIR_FD, dst_dir_fd=DEFAULT_DIR_FD):
"""rename(src, dst, *, src_dir_fd=None, dst_dir_fd=None)
Rename a file or directory.
If either src_dir_fd or dst_dir_fd is not None, it should be a file
descriptor open to a directory, and the respective path string (src or dst)
should be relative; the path will then be relative to that directory.
src_dir_fd and dst_dir_fd, may not be implemented on your platform.
If they are unavailable, using them will raise a NotImplementedError."""
try:
if (rposix.HAVE_RENAMEAT and
(src_dir_fd != DEFAULT_DIR_FD or dst_dir_fd != DEFAULT_DIR_FD)):
src = space.fsencode_w(w_src)
dst = space.fsencode_w(w_dst)
rposix.renameat(src, dst, src_dir_fd, dst_dir_fd)
else:
dispatch_filename_2(rposix.rename)(space, w_src, w_dst)
except OSError as e:
raise wrap_oserror2(space, e, w_filename=w_src, w_filename2=w_dst,
eintr_retry=False)
@unwrap_spec(src_dir_fd=DirFD(rposix.HAVE_RENAMEAT),
dst_dir_fd=DirFD(rposix.HAVE_RENAMEAT))
def replace(space, w_src, w_dst, __kwonly__,
src_dir_fd=DEFAULT_DIR_FD, dst_dir_fd=DEFAULT_DIR_FD):
"""replace(src, dst, *, src_dir_fd=None, dst_dir_fd=None)
Rename a file or directory, overwriting the destination.
If either src_dir_fd or dst_dir_fd is not None, it should be a file
descriptor open to a directory, and the respective path string (src or dst)
should be relative; the path will then be relative to that directory.
src_dir_fd and dst_dir_fd, may not be implemented on your platform.
If they are unavailable, using them will raise a NotImplementedError."""
try:
if (rposix.HAVE_RENAMEAT and
(src_dir_fd != DEFAULT_DIR_FD or dst_dir_fd != DEFAULT_DIR_FD)):
src = space.fsencode_w(w_src)
dst = space.fsencode_w(w_dst)
rposix.renameat(src, dst, src_dir_fd, dst_dir_fd)
else:
dispatch_filename_2(rposix.replace)(space, w_src, w_dst)
except OSError as e:
raise wrap_oserror2(space, e, w_filename=w_src, w_filename2=w_dst,
eintr_retry=False)
@unwrap_spec(mode=c_int, dir_fd=DirFD(rposix.HAVE_MKFIFOAT))
def mkfifo(space, w_path, mode=0666, __kwonly__=None, dir_fd=DEFAULT_DIR_FD):
"""mkfifo(path, mode=0o666, *, dir_fd=None)
Create a FIFO (a POSIX named pipe).
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
# CPython 3.5.2: why does os.mkfifo() retry automatically if it
# gets EINTR, but not os.mkdir()?
while True:
try:
if rposix.HAVE_MKFIFOAT and dir_fd != DEFAULT_DIR_FD:
path = space.fsencode_w(w_path)
rposix.mkfifoat(path, mode, dir_fd)
else:
dispatch_filename(rposix.mkfifo)(space, w_path, mode)
break
except OSError as e:
wrap_oserror2(space, e, w_path, eintr_retry=True)
@unwrap_spec(mode=c_int, device=c_int, dir_fd=DirFD(rposix.HAVE_MKNODAT))
def mknod(space, w_path, mode=0600, device=0,
__kwonly__=None, dir_fd=DEFAULT_DIR_FD):
"""mknod(path, mode=0o600, device=0, *, dir_fd=None)
Create a filesystem node (file, device special file or named pipe)
named 'path'. mode specifies both the permissions to use and the
type of node to be created, being combined (bitwise OR) with one of
S_IFREG, S_IFCHR, S_IFBLK, and S_IFIFO. For S_IFCHR and S_IFBLK,
device defines the newly created device special file (probably using
os.makedev()), otherwise it is ignored.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
while True:
try:
if rposix.HAVE_MKNODAT and dir_fd != DEFAULT_DIR_FD:
fname = space.fsencode_w(w_path)
rposix.mknodat(fname, mode, device, dir_fd)
else:
dispatch_filename(rposix.mknod)(space, w_path, mode, device)
break
except OSError as e:
wrap_oserror2(space, e, w_path, eintr_retry=True)
@unwrap_spec(mask=c_int)
def umask(space, mask):
"Set the current numeric umask and return the previous umask."
prevmask = os.umask(mask)
return space.newint(prevmask)
def getpid(space):
"Return the current process id."
try:
pid = os.getpid()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(pid)
@unwrap_spec(pid=c_int, signal=c_int)
def kill(space, pid, signal):
"Kill a process with a signal."
try:
rposix.kill(pid, signal)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(pgid=c_int, signal=c_int)
def killpg(space, pgid, signal):
"Kill a process group with a signal."
try:
os.killpg(pgid, signal)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def abort(space):
"""Abort the interpreter immediately. This 'dumps core' or otherwise fails
in the hardest way possible on the hosting operating system."""
import signal
rposix.kill(os.getpid(), signal.SIGABRT)
@unwrap_spec(
src_dir_fd=DirFD(rposix.HAVE_LINKAT), dst_dir_fd=DirFD(rposix.HAVE_LINKAT),
follow_symlinks=bool)
def link(space, w_src, w_dst, __kwonly__,
src_dir_fd=DEFAULT_DIR_FD, dst_dir_fd=DEFAULT_DIR_FD,
follow_symlinks=True):
"""\
link(src, dst, *, src_dir_fd=None, dst_dir_fd=None, follow_symlinks=True)
Create a hard link to a file.
If either src_dir_fd or dst_dir_fd is not None, it should be a file
descriptor open to a directory, and the respective path string (src or dst)
should be relative; the path will then be relative to that directory.
If follow_symlinks is False, and the last element of src is a symbolic
link, link will create a link to the symbolic link itself instead of the
file the link points to.
src_dir_fd, dst_dir_fd, and follow_symlinks may not be implemented on your
platform. If they are unavailable, using them will raise a
NotImplementedError."""
src = space.fsencode_w(w_src)
dst = space.fsencode_w(w_dst)
try:
if (rposix.HAVE_LINKAT and
(src_dir_fd != DEFAULT_DIR_FD or dst_dir_fd != DEFAULT_DIR_FD
or not follow_symlinks)):
rposix.linkat(src, dst, src_dir_fd, dst_dir_fd, follow_symlinks)
else:
rposix.link(src, dst)
except OSError as e:
raise wrap_oserror2(space, e, w_filename=w_src, w_filename2=w_dst,
eintr_retry=False)
@unwrap_spec(dir_fd=DirFD(rposix.HAVE_SYMLINKAT))
def symlink(space, w_src, w_dst, w_target_is_directory=None,
__kwonly__=None, dir_fd=DEFAULT_DIR_FD):
"""symlink(src, dst, target_is_directory=False, *, dir_fd=None)
Create a symbolic link pointing to src named dst.
target_is_directory is required on Windows if the target is to be
interpreted as a directory. (On Windows, symlink requires
Windows 6.0 or greater, and raises a NotImplementedError otherwise.)
target_is_directory is ignored on non-Windows platforms.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
if _WIN32:
raise oefmt(space.w_NotImplementedError,
"symlink() is not implemented for PyPy on Windows")
try:
if rposix.HAVE_SYMLINKAT and dir_fd != DEFAULT_DIR_FD:
src = space.fsencode_w(w_src)
dst = space.fsencode_w(w_dst)
rposix.symlinkat(src, dst, dir_fd)
else:
dispatch_filename_2(rposix.symlink)(space, w_src, w_dst)
except OSError as e:
raise wrap_oserror2(space, e, w_filename=w_src, w_filename2=w_dst,
eintr_retry=False)
@unwrap_spec(
path=path_or_fd(allow_fd=False),
dir_fd=DirFD(rposix.HAVE_READLINKAT))
def readlink(space, path, __kwonly__, dir_fd=DEFAULT_DIR_FD):
"""readlink(path, *, dir_fd=None) -> path
Return a string representing the path to which the symbolic link points.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
try:
if rposix.HAVE_READLINKAT and dir_fd != DEFAULT_DIR_FD:
result = call_rposix(rposix.readlinkat, path, dir_fd)
else:
result = call_rposix(rposix.readlink, path)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path, eintr_retry=False)
w_result = space.newbytes(result)
if space.isinstance_w(path.w_path, space.w_unicode):
return space.fsdecode(w_result)
return w_result
before_fork_hooks = []
after_fork_child_hooks = []
after_fork_parent_hooks = []
@specialize.memo()
def get_fork_hooks(where):
if where == 'before':
return before_fork_hooks
elif where == 'child':
return after_fork_child_hooks
elif where == 'parent':
return after_fork_parent_hooks
else:
assert False, "Unknown fork hook"
@not_rpython
def add_fork_hook(where, hook):
get_fork_hooks(where).append(hook)
add_fork_hook('child', ExecutionContext._mark_thread_disappeared)
@specialize.arg(0)
def run_fork_hooks(where, space):
for hook in get_fork_hooks(where):
hook(space)
def _run_forking_function(space, kind):
run_fork_hooks('before', space)
try:
if kind == "F":
pid = os.fork()
master_fd = -1
elif kind == "P":
pid, master_fd = os.forkpty()
else:
raise AssertionError
except OSError as e:
try:
run_fork_hooks('parent', space)
except:
# Don't clobber the OSError if the fork failed
pass
raise wrap_oserror(space, e, eintr_retry=False)
if pid == 0:
run_fork_hooks('child', space)
else:
run_fork_hooks('parent', space)
return pid, master_fd
def fork(space):
pid, irrelevant = _run_forking_function(space, "F")
return space.newint(pid)
def openpty(space):
"Open a pseudo-terminal, returning open fd's for both master and slave end."
master_fd = slave_fd = -1
try:
master_fd, slave_fd = os.openpty()
rposix.set_inheritable(master_fd, False)
rposix.set_inheritable(slave_fd, False)
except OSError as e:
if master_fd >= 0:
rposix.c_close(master_fd)
if slave_fd >= 0:
rposix.c_close(slave_fd)
raise wrap_oserror(space, e, eintr_retry=False)
return space.newtuple([space.newint(master_fd), space.newint(slave_fd)])
def forkpty(space):
pid, master_fd = _run_forking_function(space, "P")
return space.newtuple([space.newint(pid),
space.newint(master_fd)])
@unwrap_spec(pid=c_int, options=c_int)
def waitpid(space, pid, options):
""" waitpid(pid, options) -> (pid, status)
Wait for completion of a given child process.
"""
while True:
try:
pid, status = os.waitpid(pid, options)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
return space.newtuple([space.newint(pid), space.newint(status)])
# missing: waitid()
@unwrap_spec(status=c_int)
def _exit(space, status):
os._exit(status)
def execv(space, w_path, w_argv):
""" execv(path, args)
Execute an executable path with arguments, replacing current process.
path: path of executable file
args: iterable of strings
"""
command = space.fsencode_w(w_path)
try:
args_w = space.unpackiterable(w_argv)
if len(args_w) < 1:
raise oefmt(space.w_ValueError,
"execv() arg 2 must not be empty")
args = [space.fsencode_w(w_arg) for w_arg in args_w]
except OperationError as e:
if not e.match(space, space.w_TypeError):
raise
raise oefmt(space.w_TypeError,
"execv() arg 2 must be an iterable of strings")
try:
os.execv(command, args)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def _env2interp(space, w_env):
env = {}
w_keys = space.call_method(w_env, 'keys')
for w_key in space.unpackiterable(w_keys):
w_value = space.getitem(w_env, w_key)
env[space.fsencode_w(w_key)] = space.fsencode_w(w_value)
return env
def execve(space, w_path, w_argv, w_env):
"""execve(path, argv, env)
Execute a path with arguments and environment, replacing current process.
path: path of executable file
argv: tuple or list of arguments
env: dictionary of strings mapping to strings
On some platforms, you may specify an open file descriptor for path;
execve will execute the program the file descriptor is open to.
If this functionality is unavailable, using it raises NotImplementedError.
"""
if not (space.isinstance_w(w_argv, space.w_list)
or space.isinstance_w(w_argv, space.w_tuple)):
raise oefmt(space.w_TypeError,
"execve: argv must be a tuple or a list")
args = [space.fsencode_w(w_arg) for w_arg in space.unpackiterable(w_argv)]
env = _env2interp(space, w_env)
try:
path = space.fsencode_w(w_path)
except OperationError:
if not rposix.HAVE_FEXECVE:
raise oefmt(space.w_TypeError,
"execve: illegal type for path argument")
if not space.isinstance_w(w_path, space.w_int):
raise oefmt(space.w_TypeError,
"argument should be string, bytes or integer, not %T", w_path)
# File descriptor case
fd = unwrap_fd(space, w_path)
try:
rposix.fexecve(fd, args, env)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
try:
os.execve(path, args, env)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(mode=int, path='fsencode')
def spawnv(space, mode, path, w_argv):
args = [space.fsencode_w(w_arg) for w_arg in space.unpackiterable(w_argv)]
try:
ret = os.spawnv(mode, path, args)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(ret)
@unwrap_spec(mode=int, path='fsencode')
def spawnve(space, mode, path, w_argv, w_env):
args = [space.fsencode_w(w_arg) for w_arg in space.unpackiterable(w_argv)]
env = _env2interp(space, w_env)
try:
ret = os.spawnve(mode, path, args, env)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(ret)
@unwrap_spec(
path=path_or_fd(allow_fd=rposix.HAVE_FUTIMENS or rposix.HAVE_FUTIMES),
w_times=WrappedDefault(None), w_ns=WrappedDefault(None),
dir_fd=DirFD(rposix.HAVE_UTIMENSAT), follow_symlinks=bool)
def utime(space, path, w_times, __kwonly__, w_ns, dir_fd=DEFAULT_DIR_FD,
follow_symlinks=True):
"""utime(path, times=None, *, ns=None, dir_fd=None, follow_symlinks=True)
Set the access and modified time of path.
path may always be specified as a string.
On some platforms, path may also be specified as an open file descriptor.
If this functionality is unavailable, using it raises an exception.
If times is not None, it must be a tuple (atime, mtime);
atime and mtime should be expressed as float seconds since the epoch.
If ns is not None, it must be a tuple (atime_ns, mtime_ns);
atime_ns and mtime_ns should be expressed as integer nanoseconds
since the epoch.
If both times and ns are None, utime uses the current time.
Specifying tuples for both times and ns is an error.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
If follow_symlinks is False, and the last element of the path is a symbolic
link, utime will modify the symbolic link itself instead of the file the
link points to.
It is an error to use dir_fd or follow_symlinks when specifying path
as an open file descriptor.
dir_fd and follow_symlinks may not be available on your platform.
If they are unavailable, using them will raise a NotImplementedError."""
utime = parse_utime_args(space, w_times, w_ns)
if path.as_fd != -1:
if dir_fd != DEFAULT_DIR_FD:
raise oefmt(space.w_ValueError,
"utime: can't specify both dir_fd and fd")
if not follow_symlinks:
raise oefmt(space.w_ValueError,
"utime: cannot use fd and follow_symlinks together")
if rposix.HAVE_FUTIMENS:
do_utimens(space, rposix.futimens, path.as_fd, utime)
elif rposix.HAVE_FUTIMES:
do_utimes(space, rposix.futimes, path.as_fd, utime)
elif rposix.HAVE_UTIMENSAT:
if path.as_bytes is None:
raise oefmt(space.w_NotImplementedError,
"utime: unsupported value for 'path'")
do_utimens(space, rposix.utimensat, path.as_bytes, utime,
dir_fd, follow_symlinks)
elif rposix.HAVE_LUTIMES and not follow_symlinks:
if path.as_bytes is None:
raise oefmt(space.w_NotImplementedError,
"utime: unsupported value for 'path'")
do_utimes(space, rposix.lutimes, path.as_bytes, utime)
elif follow_symlinks:
do_utimes(space, _dispatch_utime, path, utime)
else:
raise argument_unavailable(space, "utime", "follow_symlinks")
def parse_utime_args(space, w_times, w_ns):
"""Parse utime's times/ns arguments into a 5-item tuple of a "now"
flag and 2 "TIMESPEC" like 2-item s/ns values
"""
if (not space.is_w(w_times, space.w_None) and
not space.is_w(w_ns, space.w_None)):
raise oefmt(space.w_ValueError,
"utime: you may specify either 'times' or 'ns' but not both")
now = False
if space.is_w(w_times, space.w_None) and space.is_w(w_ns, space.w_None):
now = True
atime_s = mtime_s = 0
atime_ns = mtime_ns = 0
elif not space.is_w(w_times, space.w_None):
times_w = space.fixedview(w_times)
if len(times_w) != 2:
raise oefmt(space.w_TypeError,
"utime: 'times' must be either a tuple of two ints or None")
atime_s, atime_ns = convert_seconds(space, times_w[0])
mtime_s, mtime_ns = convert_seconds(space, times_w[1])
else:
args_w = space.fixedview(w_ns)
if len(args_w) != 2:
raise oefmt(space.w_TypeError,
"utime: 'ns' must be a tuple of two ints")
atime_s, atime_ns = convert_ns(space, args_w[0])
mtime_s, mtime_ns = convert_ns(space, args_w[1])
return now, atime_s, atime_ns, mtime_s, mtime_ns
def do_utimens(space, func, arg, utime, *args):
"""Common implementation for futimens/utimensat etc."""
now, atime_s, atime_ns, mtime_s, mtime_ns = utime
if now:
atime_ns = mtime_ns = rposix.UTIME_NOW
try:
func(arg, atime_s, atime_ns, mtime_s, mtime_ns, *args)
except OSError as e:
# CPython's Modules/posixmodule.c::posix_utime() has this
# comment:
# /* Avoid putting the file name into the error here,
# as that may confuse the user into believing that
# something is wrong with the file, when it also
# could be the time stamp that gives a problem. */
# so we use wrap_oserror() instead of wrap_oserror2() here
raise wrap_oserror(space, e, eintr_retry=False)
@specialize.arg(1)
def do_utimes(space, func, arg, utime):
"""Common implementation for f/l/utimes"""
now, atime_s, atime_ns, mtime_s, mtime_ns = utime
try:
if now:
func(arg, None)
else:
# convert back to utimes style floats. loses precision of
# nanoseconds but utimes only support microseconds anyway
atime = atime_s + (atime_ns / 1e9)
mtime = mtime_s + (mtime_ns / 1e9)
func(arg, (atime, mtime))
except OSError as e:
# see comment above: don't use wrap_oserror2()
raise wrap_oserror(space, e, eintr_retry=False)
@specialize.argtype(1)
def _dispatch_utime(path, times):
# XXX: a dup. of call_rposix to specialize rposix.utime taking a
# Path for win32 support w/ do_utimes
if path.as_unicode is not None:
return rposix.utime(path.as_unicode, times)
else:
path_b = path.as_bytes
assert path_b is not None
return rposix.utime(path.as_bytes, times)
def convert_seconds(space, w_time):
if space.isinstance_w(w_time, space.w_float):
time = space.float_w(w_time)
fracpart, intpart = modf(time)
if fracpart < 0:
fracpart += 1.
intpart -= 1.
return int(intpart), int(fracpart*1e9)
else:
time = space.int_w(w_time)
return time, 0
def convert_ns(space, w_ns_time):
w_billion = space.newint(1000000000)
w_res = space.divmod(w_ns_time, w_billion)
res_w = space.fixedview(w_res)
time_int = space.int_w(res_w[0])
time_frac = space.int_w(res_w[1])
return time_int, time_frac
def uname(space):
""" uname() -> (sysname, nodename, release, version, machine)
Return a tuple identifying the current operating system.
"""
try:
r = os.uname()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
l_w = [space.newfilename(i)
for i in [r[0], r[1], r[2], r[3], r[4]]]
w_tuple = space.newtuple(l_w)
w_uname_result = space.getattr(space.getbuiltinmodule(os.name),
space.newtext('uname_result'))
return space.call_function(w_uname_result, w_tuple)
def getuid(space):
""" getuid() -> uid
Return the current process's user id.
"""
return wrap_uid(space, os.getuid())
@unwrap_spec(uid=c_uid_t)
def setuid(space, uid):
""" setuid(uid)
Set the current process's user id.
"""
try:
os.setuid(uid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(euid=c_uid_t)
def seteuid(space, euid):
""" seteuid(euid)
Set the current process's effective user id.
"""
try:
os.seteuid(euid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(gid=c_gid_t)
def setgid(space, gid):
""" setgid(gid)
Set the current process's group id.
"""
try:
os.setgid(gid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(egid=c_gid_t)
def setegid(space, egid):
""" setegid(egid)
Set the current process's effective group id.
"""
try:
os.setegid(egid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def chroot(space, w_path):
""" chroot(path)
Change root directory to path.
"""
path = space.fsencode_w(w_path)
try:
os.chroot(path)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
return space.w_None
def getgid(space):
""" getgid() -> gid
Return the current process's group id.
"""
return wrap_gid(space, os.getgid())
def getegid(space):
""" getegid() -> gid
Return the current process's effective group id.
"""
return wrap_gid(space, os.getegid())
def geteuid(space):
""" geteuid() -> euid
Return the current process's effective user id.
"""
return wrap_uid(space, os.geteuid())
def getgroups(space):
""" getgroups() -> list of group IDs
Return list of supplemental group IDs for the process.
"""
try:
list = os.getgroups()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newlist([wrap_gid(space, e) for e in list])
def setgroups(space, w_groups):
""" setgroups(groups)
Set the groups of the current process to list.
"""
list = []
for w_gid in space.unpackiterable(w_groups):
list.append(space.c_uid_t_w(w_gid))
try:
os.setgroups(list[:])
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(username='text', gid=c_gid_t)
def initgroups(space, username, gid):
""" initgroups(username, gid) -> None
Call the system initgroups() to initialize the group access list with all of
the groups of which the specified username is a member, plus the specified
group id.
"""
try:
os.initgroups(username, gid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def getpgrp(space):
""" getpgrp() -> pgrp
Return the current process group id.
"""
return space.newint(os.getpgrp())
def setpgrp(space):
""" setpgrp()
Make this process a session leader.
"""
try:
os.setpgrp()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.w_None
def getppid(space):
""" getppid() -> ppid
Return the parent's process id.
"""
return space.newint(os.getppid())
@unwrap_spec(pid=c_int)
def getpgid(space, pid):
""" getpgid(pid) -> pgid
Call the system call getpgid().
"""
try:
pgid = os.getpgid(pid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(pgid)
@unwrap_spec(pid=c_int, pgrp=c_int)
def setpgid(space, pid, pgrp):
""" setpgid(pid, pgrp)
Call the system call setpgid().
"""
try:
os.setpgid(pid, pgrp)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.w_None
@unwrap_spec(ruid=c_uid_t, euid=c_uid_t)
def setreuid(space, ruid, euid):
""" setreuid(ruid, euid)
Set the current process's real and effective user ids.
"""
try:
os.setreuid(ruid, euid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(rgid=c_gid_t, egid=c_gid_t)
def setregid(space, rgid, egid):
""" setregid(rgid, egid)
Set the current process's real and effective group ids.
"""
try:
os.setregid(rgid, egid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(pid=c_int)
def getsid(space, pid):
""" getsid(pid) -> sid
Call the system call getsid().
"""
try:
sid = os.getsid(pid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(sid)
def setsid(space):
""" setsid()
Call the system call setsid().
"""
try:
os.setsid()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.w_None
@unwrap_spec(fd=c_int)
def tcgetpgrp(space, fd):
""" tcgetpgrp(fd) -> pgid
Return the process group associated with the terminal given by a fd.
"""
try:
pgid = os.tcgetpgrp(fd)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(pgid)
@unwrap_spec(fd=c_int, pgid=c_gid_t)
def tcsetpgrp(space, fd, pgid):
""" tcsetpgrp(fd, pgid)
Set the process group associated with the terminal given by a fd.
"""
try:
os.tcsetpgrp(fd, pgid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def getresuid(space):
""" getresuid() -> (ruid, euid, suid)
Get tuple of the current process's real, effective, and saved user ids.
"""
try:
(ruid, euid, suid) = os.getresuid()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newtuple([wrap_uid(space, ruid),
wrap_uid(space, euid),
wrap_uid(space, suid)])
def getresgid(space):
""" getresgid() -> (rgid, egid, sgid)
Get tuple of the current process's real, effective, and saved group ids.
"""
try:
(rgid, egid, sgid) = os.getresgid()
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newtuple([wrap_gid(space, rgid),
wrap_gid(space, egid),
wrap_gid(space, sgid)])
@unwrap_spec(ruid=c_uid_t, euid=c_uid_t, suid=c_uid_t)
def setresuid(space, ruid, euid, suid):
""" setresuid(ruid, euid, suid)
Set the current process's real, effective, and saved user ids.
"""
try:
os.setresuid(ruid, euid, suid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(rgid=c_gid_t, egid=c_gid_t, sgid=c_gid_t)
def setresgid(space, rgid, egid, sgid):
""" setresgid(rgid, egid, sgid)
Set the current process's real, effective, and saved group ids.
"""
try:
os.setresgid(rgid, egid, sgid)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(which=int, who=int)
def getpriority(space, which, who):
""" getpriority(which, who) -> int
Get program scheduling priority.
"""
try:
returned_priority = rposix.getpriority(which, who)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(returned_priority)
@unwrap_spec(which=int, who=int, priority=int)
def setpriority(space, which, who, priority):
""" setpriority(which, who, priority)
Set program scheduling priority.
"""
try:
rposix.setpriority(which, who, priority)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def declare_new_w_star(name):
if name in ('WEXITSTATUS', 'WSTOPSIG', 'WTERMSIG'):
@unwrap_spec(status=c_int)
def WSTAR(space, status):
return space.newint(getattr(os, name)(status))
else:
@unwrap_spec(status=c_int)
def WSTAR(space, status):
return space.newbool(getattr(os, name)(status))
WSTAR.__doc__ = getattr(os, name).__doc__
WSTAR.func_name = name
return WSTAR
for name in rposix.WAIT_MACROS:
if hasattr(os, name):
func = declare_new_w_star(name)
globals()[name] = func
@unwrap_spec(fd=c_int)
def ttyname(space, fd):
try:
return space.newfilename(os.ttyname(fd))
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
def confname_w(space, w_name, namespace):
# XXX slightly non-nice, reuses the sysconf of the underlying os module
if space.isinstance_w(w_name, space.w_unicode):
try:
num = namespace[space.text_w(w_name)]
except KeyError:
raise oefmt(space.w_ValueError, "unrecognized configuration name")
else:
num = space.int_w(w_name)
return num
def sysconf(space, w_name):
num = confname_w(space, w_name, os.sysconf_names)
try:
res = os.sysconf(num)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(res)
@unwrap_spec(fd=c_int)
def fpathconf(space, fd, w_name):
num = confname_w(space, w_name, os.pathconf_names)
try:
res = os.fpathconf(fd, num)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(res)
@unwrap_spec(path=path_or_fd(allow_fd=hasattr(os, 'fpathconf')))
def pathconf(space, path, w_name):
num = confname_w(space, w_name, os.pathconf_names)
if path.as_fd != -1:
try:
res = os.fpathconf(path.as_fd, num)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
else:
try:
res = os.pathconf(path.as_bytes, num)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path, eintr_retry=False)
return space.newint(res)
def confstr(space, w_name):
num = confname_w(space, w_name, os.confstr_names)
try:
res = os.confstr(num)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newtext(res)
@unwrap_spec(
uid=c_uid_t, gid=c_gid_t,
dir_fd=DirFD(rposix.HAVE_FCHOWNAT), follow_symlinks=bool)
def chown(space, w_path, uid, gid, __kwonly__,
dir_fd=DEFAULT_DIR_FD, follow_symlinks=True):
"""chown(path, uid, gid, *, dir_fd=None, follow_symlinks=True)
Change the owner and group id of path to the numeric uid and gid.
path may always be specified as a string.
On some platforms, path may also be specified as an open file descriptor.
If this functionality is unavailable, using it raises an exception.
If dir_fd is not None, it should be a file descriptor open to a directory,
and path should be relative; path will then be relative to that directory.
If follow_symlinks is False, and the last element of the path is a symbolic
link, chown will modify the symbolic link itself instead of the file the
link points to.
It is an error to use dir_fd or follow_symlinks when specifying path as
an open file descriptor.
dir_fd and follow_symlinks may not be implemented on your platform.
If they are unavailable, using them will raise a NotImplementedError."""
if not (rposix.HAVE_LCHOWN or rposix.HAVE_FCHMODAT):
if not follow_symlinks:
raise argument_unavailable(space, 'chown', 'follow_symlinks')
try:
path = space.fsencode_w(w_path)
except OperationError:
if not space.isinstance_w(w_path, space.w_int):
raise oefmt(space.w_TypeError,
"argument should be string, bytes or integer, not %T", w_path)
# File descriptor case
fd = unwrap_fd(space, w_path)
if dir_fd != DEFAULT_DIR_FD:
raise oefmt(space.w_ValueError,
"chown: can't specify both dir_fd and fd")
if not follow_symlinks:
raise oefmt(space.w_ValueError,
"chown: cannnot use fd and follow_symlinks together")
# NB. in CPython 3.5.2, os.chown(fd) propagates EINTR to app-level,
# but os.fchown(fd) retries automatically. This might be fixed in
# more recent CPythons.
while True:
try:
os.fchown(fd, uid, gid)
return
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
while True:
# String case
try:
if (rposix.HAVE_LCHOWN and
dir_fd == DEFAULT_DIR_FD and not follow_symlinks):
os.lchown(path, uid, gid)
elif rposix.HAVE_FCHOWNAT and (
not follow_symlinks or dir_fd != DEFAULT_DIR_FD):
rposix.fchownat(path, uid, gid, dir_fd, follow_symlinks)
else:
assert follow_symlinks
assert dir_fd == DEFAULT_DIR_FD
os.chown(path, uid, gid)
break
except OSError as e:
wrap_oserror2(space, e, w_path, eintr_retry=True)
@unwrap_spec(uid=c_uid_t, gid=c_gid_t)
def lchown(space, w_path, uid, gid):
"""lchown(path, uid, gid)
Change the owner and group id of path to the numeric uid and gid.
This function will not follow symbolic links.
Equivalent to os.chown(path, uid, gid, follow_symlinks=False)."""
path = space.fsencode_w(w_path)
try:
os.lchown(path, uid, gid)
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
@unwrap_spec(uid=c_uid_t, gid=c_gid_t)
def fchown(space, w_fd, uid, gid):
"""fchown(fd, uid, gid)
Change the owner and group id of the file given by file descriptor
fd to the numeric uid and gid. Equivalent to os.chown(fd, uid, gid)."""
fd = space.c_filedescriptor_w(w_fd)
while True:
try:
os.fchown(fd, uid, gid)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
def getloadavg(space):
try:
load = os.getloadavg()
except OSError:
raise oefmt(space.w_OSError, "Load averages are unobtainable")
return space.newtuple([space.newfloat(load[0]),
space.newfloat(load[1]),
space.newfloat(load[2])])
@unwrap_spec(major=c_int, minor=c_int)
def makedev(space, major, minor):
result = os.makedev(major, minor)
return space.newint(result)
@unwrap_spec(device="c_uint")
def major(space, device):
result = os.major(intmask(device))
return space.newint(result)
@unwrap_spec(device="c_uint")
def minor(space, device):
result = os.minor(intmask(device))
return space.newint(result)
@unwrap_spec(increment=c_int)
def nice(space, increment):
"""Decrease the priority of process by 'increment'
and return the new priority."""
try:
res = os.nice(increment)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newint(res)
class SigCheck:
pass
_sigcheck = SigCheck()
def _signal_checker():
_sigcheck.space.getexecutioncontext().checksignals()
@unwrap_spec(size=int)
def urandom(space, size):
"""urandom(size) -> str
Return a string of 'size' random bytes suitable for cryptographic use.
"""
context = get(space).random_context
try:
# urandom() takes a final argument that should be a regular function,
# not a bound method like 'getexecutioncontext().checksignals'.
# Otherwise, we can't use it from several independent places.
_sigcheck.space = space
return space.newbytes(rurandom.urandom(context, size, _signal_checker))
except OSError as e:
# 'rurandom' should catch and retry internally if it gets EINTR
# (at least in os.read(), which is probably enough in practice)
raise wrap_oserror(space, e, eintr_retry=False)
def ctermid(space):
"""ctermid() -> string
Return the name of the controlling terminal for this process.
"""
return space.newfilename(os.ctermid())
@unwrap_spec(fd=c_int)
def device_encoding(space, fd):
"""device_encoding(fd) -> str
Return a string describing the encoding of the device if the output
is a terminal; else return None.
"""
with rposix.FdValidator(fd):
if not (os.isatty(fd)):
return space.w_None
if _WIN32:
if fd == 0:
ccp = rwin32.GetConsoleCP()
elif fd in (1, 2):
ccp = rwin32.GetConsoleOutputCP()
else:
ccp = 0
# GetConsoleCP() and GetConsoleOutputCP() return 0 if the
# application has no console.
if ccp != 0:
return space.newtext('cp%d' % ccp)
from rpython.rlib import rlocale
if rlocale.HAVE_LANGINFO:
codeset = rlocale.nl_langinfo(rlocale.CODESET)
if codeset:
return space.newtext(codeset)
return space.w_None
if _WIN32:
from pypy.module.posix import interp_nt as nt
@unwrap_spec(fd=c_int)
def _getfileinformation(space, fd):
try:
info = nt._getfileinformation(fd)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newtuple([space.newint(info[0]),
space.newint(info[1]),
space.newint(info[2])])
def _getfinalpathname(space, w_path):
path = space.unicode_w(w_path)
try:
result = nt._getfinalpathname(path)
except nt.LLNotImplemented as e:
raise OperationError(space.w_NotImplementedError,
space.newtext(e.msg))
except OSError as e:
raise wrap_oserror2(space, e, w_path, eintr_retry=False)
return space.newunicode(result)
def chflags():
"""chflags(path, flags, *, follow_symlinks=True)
Set file flags.
If follow_symlinks is False, and the last element of the path is a symbolic
link, chflags will change flags on the symbolic link itself instead of the
file the link points to.
follow_symlinks may not be implemented on your platform. If it is
unavailable, using it will raise a NotImplementedError."""
def lchflags():
"""lchflags(path, flags)
Set file flags.
This function will not follow symbolic links.
Equivalent to chflags(path, flags, follow_symlinks=False)."""
@unwrap_spec(path=path_or_fd(), attribute=path_or_fd(allow_fd=False),
follow_symlinks=bool)
def getxattr(space, path, attribute, __kwonly__, follow_symlinks=True):
"""getxattr(path, attribute, *, follow_symlinks=True) -> value
Return the value of extended attribute attribute on path.
path may be either a string or an open file descriptor.
If follow_symlinks is False, and the last element of the path is a symbolic
link, getxattr will examine the symbolic link itself instead of the file
the link points to."""
if path.as_fd != -1:
if not follow_symlinks:
raise oefmt(space.w_ValueError,
"getxattr: cannot use fd and follow_symlinks together")
try:
result = rposix.fgetxattr(path.as_fd, attribute.as_bytes)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path)
else:
try:
result = rposix.getxattr(path.as_bytes, attribute.as_bytes,
follow_symlinks=follow_symlinks)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path)
return space.newbytes(result)
@unwrap_spec(path=path_or_fd(), attribute=path_or_fd(allow_fd=False),
flags=c_int,
follow_symlinks=bool)
def setxattr(space, path, attribute, w_value, flags=0,
__kwonly__=None, follow_symlinks=True):
"""setxattr(path, attribute, value, flags=0, *, follow_symlinks=True)
Set extended attribute attribute on path to value.
path may be either a string or an open file descriptor.
If follow_symlinks is False, and the last element of the path is a symbolic
link, setxattr will modify the symbolic link itself instead of the file
the link points to."""
value = space.charbuf_w(w_value)
if path.as_fd != -1:
if not follow_symlinks:
raise oefmt(space.w_ValueError,
"setxattr: cannot use fd and follow_symlinks together")
try:
rposix.fsetxattr(path.as_fd, attribute.as_bytes, value, flags)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path)
else:
try:
rposix.setxattr(path.as_bytes, attribute.as_bytes, value, flags,
follow_symlinks=follow_symlinks)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path)
@unwrap_spec(path=path_or_fd(), attribute=path_or_fd(allow_fd=False),
follow_symlinks=bool)
def removexattr(space, path, attribute, __kwonly__, follow_symlinks=True):
"""removexattr(path, attribute, *, follow_symlinks=True)
Remove extended attribute attribute on path.
path may be either a string or an open file descriptor.
If follow_symlinks is False, and the last element of the path is a symbolic
link, removexattr will modify the symbolic link itself instead of the file
the link points to."""
if path.as_fd != -1:
if not follow_symlinks:
raise oefmt(space.w_ValueError,
"removexattr: cannot use fd and follow_symlinks together")
try:
rposix.fremovexattr(path.as_fd, attribute.as_bytes)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path)
else:
try:
rposix.removexattr(path.as_bytes, attribute.as_bytes,
follow_symlinks=follow_symlinks)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path)
@unwrap_spec(path=path_or_fd(), follow_symlinks=bool)
def listxattr(space, path, __kwonly__, follow_symlinks=True):
"""listxattr(path='.', *, follow_symlinks=True)
Return a list of extended attributes on path.
path may be either None, a string, or an open file descriptor.
if path is None, listxattr will examine the current directory.
If follow_symlinks is False, and the last element of the path is a symbolic
link, listxattr will examine the symbolic link itself instead of the file
the link points to."""
if path.as_fd != -1:
if not follow_symlinks:
raise oefmt(space.w_ValueError,
"listxattr: cannot use fd and follow_symlinks together")
try:
result = rposix.flistxattr(path.as_fd)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path)
else:
try:
result = rposix.listxattr(path.as_bytes, follow_symlinks)
except OSError as e:
raise wrap_oserror2(space, e, path.w_path)
return space.newlist([space.newfilename(attr) for attr in result])
have_functions = []
for name in """FCHDIR FCHMOD FCHMODAT FCHOWN FCHOWNAT FEXECVE FDOPENDIR
FPATHCONF FSTATAT FSTATVFS FTRUNCATE FUTIMENS FUTIMES
FUTIMESAT LINKAT LCHFLAGS LCHMOD LCHOWN LSTAT LUTIMES
MKDIRAT MKFIFOAT MKNODAT OPENAT READLINKAT RENAMEAT
SYMLINKAT UNLINKAT UTIMENSAT""".split():
if getattr(rposix, "HAVE_%s" % name):
have_functions.append("HAVE_%s" % name)
if _WIN32:
have_functions.append("HAVE_MS_WINDOWS")
def _get_terminal_size(space, w_fd=None):
if w_fd is None:
fd = rfile.RFile(rfile.c_stdout(), close2=(None, None)).fileno()
else:
if not space.isinstance_w(w_fd, space.w_int):
raise oefmt(space.w_TypeError,
"an integer is required, got %T", w_fd)
else:
fd = space.c_int_w(w_fd)
if _WIN32:
if fd == 0:
handle_id = rwin32.STD_INPUT_HANDLE
elif fd == 1:
handle_id = rwin32.STD_OUTPUT_HANDLE
elif fd == 2:
handle_id = rwin32.STD_ERROR_HANDLE
else:
raise oefmt(space.w_ValueError, "bad file descriptor")
handle = rwin32.GetStdHandle(handle_id)
if handle == rwin32.NULL_HANDLE:
raise oefmt(space.w_OSError, "handle cannot be retrieved")
elif handle == rwin32.INVALID_HANDLE_VALUE:
raise rwin32.lastSavedWindowsError()
with lltype.scoped_alloc(rwin32.CONSOLE_SCREEN_BUFFER_INFO) as buffer_info:
success = rwin32.GetConsoleScreenBufferInfo(handle, buffer_info)
if not success:
raise rwin32.lastSavedWindowsError()
w_columns = space.newint(r_int(buffer_info.c_srWindow.c_Right) - r_int(buffer_info.c_srWindow.c_Left) + 1)
w_lines = space.newint(r_int(buffer_info.c_srWindow.c_Bottom) - r_int(buffer_info.c_srWindow.c_Top) + 1)
else:
with lltype.scoped_alloc(rposix.WINSIZE) as winsize:
failed = rposix.c_ioctl_voidp(fd, rposix.TIOCGWINSZ, winsize)
if failed:
raise exception_from_saved_errno(space, space.w_OSError)
w_columns = space.newint(r_uint(winsize.c_ws_col))
w_lines = space.newint(r_uint(winsize.c_ws_row))
return w_columns, w_lines
def get_terminal_size(space, w_fd=None):
try:
w_columns, w_lines = _get_terminal_size(space, w_fd)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
w_tuple = space.newtuple([w_columns, w_lines])
w_terminal_size = space.getattr(space.getbuiltinmodule(os.name),
space.newtext('terminal_size'))
return space.call_function(w_terminal_size, w_tuple)
def cpu_count(space):
count = rposix.cpu_count()
if count <= 0:
return space.w_None
return space.newint(count)
@unwrap_spec(fd=c_int)
def get_blocking(space, fd):
"""get_blocking(fd) -> bool
Get the blocking mode of the file descriptor:
False if the O_NONBLOCK flag is set, True if the flag is cleared."""
try:
flags = rposix.get_status_flags(fd)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
return space.newbool(flags & rposix.O_NONBLOCK == 0)
@unwrap_spec(fd=c_int, blocking=int)
def set_blocking(space, fd, blocking):
"""\
set_blocking(fd, blocking)
Set the blocking mode of the specified file descriptor.
Set the O_NONBLOCK flag if blocking is False,
clear the O_NONBLOCK flag otherwise."""
try:
flags = rposix.get_status_flags(fd)
if blocking:
flags &= ~rposix.O_NONBLOCK
else:
flags |= rposix.O_NONBLOCK
rposix.set_status_flags(fd, flags)
except OSError as e:
raise wrap_oserror(space, e, eintr_retry=False)
@unwrap_spec(out=c_int, count=int)
def sendfile(space, out, w_in, w_offset, count):
"""\
sendfile(out, in, offset, count[, headers][, trailers], flags=0)
-> byteswritten
Copy count bytes from file descriptor in to file descriptor out."""
# why is an argument called "in"??? that doesn't make sense (it is
# a reserved word), but that's what CPython does
in_ = space.c_int_w(w_in)
# XXX only supports the common arguments for now (BSD takes more).
# Until that is fixed, we only expose sendfile() on linux.
if space.is_none(w_offset): # linux only
while True:
try:
res = rposix.sendfile_no_offset(out, in_, count)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
offset = space.gateway_r_longlong_w(w_offset)
while True:
try:
res = rposix.sendfile(out, in_, offset, count)
break
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
return space.newint(res)
@unwrap_spec(policy=int)
def sched_get_priority_max(space, policy):
"""returns the maximum priority value that
can be used with the scheduling algorithm
identified by policy
"""
while True:
try:
s = rposix.sched_get_priority_max(policy)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newint(s)
@unwrap_spec(policy=int)
def sched_get_priority_min(space, policy):
"""returns the minimum priority value that
can be used with the scheduling algorithm
identified by policy
"""
while True:
try:
s = rposix.sched_get_priority_min(policy)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newint(s)
@unwrap_spec(fd=c_int, cmd=c_int, length=r_longlong)
def lockf(space, fd, cmd, length):
"""apply, test or remove a POSIX lock on an
open file.
"""
while True:
try:
s = rposix.lockf(fd, cmd, length)
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newint(s)
def sched_yield(space):
""" Voluntarily relinquish the CPU"""
while True:
try:
res = rposix.sched_yield()
except OSError as e:
wrap_oserror(space, e, eintr_retry=True)
else:
return space.newint(res)
|