1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322
|
# SPDX-License-Identifier: Apache-2.0
# Copyright 2016-2017 The Meson development team
# A tool to run tests in many different ways.
from __future__ import annotations
from pathlib import Path
from collections import deque
from contextlib import suppress
from copy import deepcopy
from fnmatch import fnmatch
import argparse
import asyncio
import datetime
import enum
import json
import os
import pickle
import platform
import random
import re
import signal
import subprocess
import shlex
import sys
import time
import typing as T
import unicodedata
import xml.etree.ElementTree as et
from . import build
from . import tooldetect
from . import mlog
from .coredata import MesonVersionMismatchException, major_versions_differ
from .coredata import version as coredata_version
from .mesonlib import (MesonException, OrderedSet, RealPathAction,
get_wine_shortpath, join_args, split_args, setup_vsenv,
determine_worker_count)
from .options import OptionKey
from .programs import ExternalProgram
from .backend.backends import TestProtocol, TestSerialisation
if T.TYPE_CHECKING:
TYPE_TAPResult = T.Union['TAPParser.Test',
'TAPParser.Error',
'TAPParser.Version',
'TAPParser.Plan',
'TAPParser.UnknownLine',
'TAPParser.Bailout']
# GNU autotools interprets a return code of 77 from tests it executes to
# mean that the test should be skipped.
GNU_SKIP_RETURNCODE = 77
# GNU autotools interprets a return code of 99 from tests it executes to
# mean that the test failed even before testing what it is supposed to test.
GNU_ERROR_RETURNCODE = 99
# Exit if 3 Ctrl-C's are received within one second
MAX_CTRLC = 3
# Define unencodable xml characters' regex for replacing them with their
# printable representation
UNENCODABLE_XML_UNICHRS: T.List[T.Tuple[int, int]] = [
(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84),
(0x86, 0x9F), (0xFDD0, 0xFDEF), (0xFFFE, 0xFFFF)]
# Not narrow build
if sys.maxunicode >= 0x10000:
UNENCODABLE_XML_UNICHRS.extend([
(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF),
(0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF),
(0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
(0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF),
(0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF),
(0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
(0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF),
(0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)])
UNENCODABLE_XML_CHR_RANGES = [fr'{chr(low)}-{chr(high)}' for (low, high) in UNENCODABLE_XML_UNICHRS]
UNENCODABLE_XML_CHRS_RE = re.compile('([' + ''.join(UNENCODABLE_XML_CHR_RANGES) + '])')
RUST_TEST_RE = re.compile(r'^test (?!result)(.*) \.\.\. (.*)$')
RUST_DOCTEST_RE = re.compile(r'^(.*?) - (.*? |)\(line (\d+)\)')
def is_windows() -> bool:
platname = platform.system().lower()
return platname == 'windows'
def is_cygwin() -> bool:
return sys.platform == 'cygwin'
UNIWIDTH_MAPPING = {'F': 2, 'H': 1, 'W': 2, 'Na': 1, 'N': 1, 'A': 1}
def uniwidth(s: str) -> int:
result = 0
for c in s:
w = unicodedata.east_asian_width(c)
result += UNIWIDTH_MAPPING[w]
return result
def test_slice(arg: str) -> T.Tuple[int, int]:
values = arg.split('/')
if len(values) != 2:
raise argparse.ArgumentTypeError("value does not conform to format 'SLICE/NUM_SLICES'")
try:
nrslices = int(values[1])
except ValueError:
raise argparse.ArgumentTypeError('NUM_SLICES is not an integer')
if nrslices <= 0:
raise argparse.ArgumentTypeError('NUM_SLICES is not a positive integer')
try:
subslice = int(values[0])
except ValueError:
raise argparse.ArgumentTypeError('SLICE is not an integer')
if subslice <= 0:
raise argparse.ArgumentTypeError('SLICE is not a positive integer')
if subslice > nrslices:
raise argparse.ArgumentTypeError('SLICE exceeds NUM_SLICES')
return subslice, nrslices
# Note: when adding arguments, please also add them to the completion
# scripts in $MESONSRC/data/shell-completions/
def add_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument('--maxfail', default=0, type=int,
help='Number of failing tests before aborting the '
'test run. (default: 0, to disable aborting on failure)')
parser.add_argument('--repeat', default=1, dest='repeat', type=int,
help='Number of times to run the tests.')
parser.add_argument('--no-rebuild', default=False, action='store_true',
help='Do not rebuild before running tests.')
parser.add_argument('--gdb', default=False, dest='gdb', action='store_true',
help='Run test under gdb.')
parser.add_argument('--gdb-path', default='gdb', dest='gdb_path',
help='Path to the gdb binary (default: gdb).')
parser.add_argument('-i', '--interactive', default=False, dest='interactive',
action='store_true', help='Run tests with interactive input/output.')
parser.add_argument('--list', default=False, dest='list', action='store_true',
help='List available tests.')
parser.add_argument('--wrapper', default=None, dest='wrapper', type=split_args,
help='wrapper to run tests with (e.g. Valgrind)')
parser.add_argument('-C', dest='wd', action=RealPathAction,
help='directory to cd into before running')
parser.add_argument('--suite', default=[], dest='include_suites', action='append', metavar='SUITE',
help='Only run tests belonging to the given suite.')
parser.add_argument('--no-suite', default=[], dest='exclude_suites', action='append', metavar='SUITE',
help='Do not run tests belonging to the given suite.')
parser.add_argument('--no-stdsplit', default=True, dest='split', action='store_false',
help='Do not split stderr and stdout in test logs.')
parser.add_argument('--print-errorlogs', default=False, action='store_true',
help="Whether to print failing tests' logs.")
parser.add_argument('--benchmark', default=False, action='store_true',
help="Run benchmarks instead of tests.")
parser.add_argument('--logbase', default='testlog',
help="Base name for log file.")
parser.add_argument('-j', '--num-processes', default=determine_worker_count(['MESON_TESTTHREADS']), type=int,
help='How many parallel processes to use.')
parser.add_argument('-v', '--verbose', default=False, action='store_true',
help='Do not redirect stdout and stderr')
parser.add_argument('-q', '--quiet', default=False, action='store_true',
help='Produce less output to the terminal.')
parser.add_argument('-t', '--timeout-multiplier', type=float, default=None,
help='Define a multiplier for test timeout, for example '
' when running tests in particular conditions they might take'
' more time to execute. (<= 0 to disable timeout)')
parser.add_argument('--setup', default=None, dest='setup',
help='Which test setup to use.')
parser.add_argument('--test-args', default=[], type=split_args,
help='Arguments to pass to the specified test(s) or all tests')
parser.add_argument('--max-lines', default=100, dest='max_lines', type=int,
help='Maximum number of lines to show from a long test log. Since 1.5.0.')
parser.add_argument('--slice', default=None, type=test_slice, metavar='SLICE/NUM_SLICES',
help='Split tests into NUM_SLICES slices and execute slice SLICE. Since 1.8.0.')
parser.add_argument('args', nargs='*',
help='Optional list of test names to run. "testname" to run all tests with that name, '
'"subprojname:testname" to specifically run "testname" from "subprojname", '
'"subprojname:" to run all tests defined by "subprojname".')
def print_safe(s: str) -> None:
end = '' if s[-1] == '\n' else '\n'
try:
print(s, end=end)
except UnicodeEncodeError:
s = s.encode('ascii', errors='backslashreplace').decode('ascii')
print(s, end=end)
def join_lines(a: str, b: str) -> str:
if not a:
return b
if not b:
return a
return a + '\n' + b
def dashes(s: str, dash: str, cols: int) -> str:
if not s:
return dash * cols
s = ' ' + s + ' '
width = uniwidth(s)
first = (cols - width) // 2
s = dash * first + s
return s + dash * (cols - first - width)
def returncode_to_status(retcode: int) -> str:
# Note: We can't use `os.WIFSIGNALED(result.returncode)` and the related
# functions here because the status returned by subprocess is munged. It
# returns a negative value if the process was killed by a signal rather than
# the raw status returned by `wait()`. Also, If a shell sits between Meson
# the actual unit test that shell is likely to convert a termination due
# to a signal into an exit status of 128 plus the signal number.
if retcode < 0:
signum = -retcode
try:
signame = signal.Signals(signum).name
except ValueError:
signame = 'SIGinvalid'
return f'killed by signal {signum} {signame}'
if retcode <= 128:
return f'exit status {retcode}'
signum = retcode - 128
if signum < 32:
try:
signame = signal.Signals(signum).name
except ValueError:
signame = 'SIGinvalid'
return f'(exit status {retcode} or signal {signum} {signame})'
return f'(exit status {retcode} or {hex(retcode)})'
# TODO for Windows
sh_quote: T.Callable[[str], str] = lambda x: x
if not is_windows():
sh_quote = shlex.quote
def env_tuple_to_str(env: T.Iterable[T.Tuple[str, str]]) -> str:
return ''.join(["{}={} ".format(k, sh_quote(v)) for k, v in env])
class TestException(MesonException):
pass
@enum.unique
class ConsoleUser(enum.Enum):
# the logger can use the console
LOGGER = 0
# the console is used by gdb or the user
INTERACTIVE = 1
# the console is used to write stdout/stderr
STDOUT = 2
@enum.unique
class TestResult(enum.Enum):
PENDING = 'PENDING'
RUNNING = 'RUNNING'
OK = 'OK'
TIMEOUT = 'TIMEOUT'
INTERRUPT = 'INTERRUPT'
SKIP = 'SKIP'
FAIL = 'FAIL'
EXPECTEDFAIL = 'EXPECTEDFAIL'
UNEXPECTEDPASS = 'UNEXPECTEDPASS'
ERROR = 'ERROR'
IGNORED = 'IGNORED'
@staticmethod
def maxlen() -> int:
return 14 # len(UNEXPECTEDPASS)
def is_ok(self) -> bool:
return self in {TestResult.OK, TestResult.EXPECTEDFAIL}
def is_bad(self) -> bool:
return self in {TestResult.FAIL, TestResult.TIMEOUT, TestResult.INTERRUPT,
TestResult.UNEXPECTEDPASS, TestResult.ERROR}
def is_finished(self) -> bool:
return self not in {TestResult.PENDING, TestResult.RUNNING}
def was_killed(self) -> bool:
return self in (TestResult.TIMEOUT, TestResult.INTERRUPT)
def colorize(self, s: str) -> mlog.AnsiDecorator:
if self.is_bad():
decorator = mlog.red
elif self in (TestResult.SKIP, TestResult.IGNORED, TestResult.EXPECTEDFAIL):
decorator = mlog.yellow
elif self.is_finished():
decorator = mlog.green
else:
decorator = mlog.blue
return decorator(s)
def get_text(self, colorize: bool) -> str:
result_str = '{res:{reslen}}'.format(res=self.value, reslen=self.maxlen())
return self.colorize(result_str).get_text(colorize)
def get_command_marker(self) -> str:
return str(self.colorize('>>> '))
class TAPParser:
class Plan(T.NamedTuple):
num_tests: int
late: bool
skipped: bool
explanation: T.Optional[str]
class Bailout(T.NamedTuple):
message: str
class Test(T.NamedTuple):
number: int
name: str
result: TestResult
explanation: T.Optional[str]
def __str__(self) -> str:
return f'{self.number} {self.name}'.strip()
class Error(T.NamedTuple):
message: str
class UnknownLine(T.NamedTuple):
message: str
lineno: int
class Version(T.NamedTuple):
version: int
_MAIN = 1
_AFTER_TEST = 2
_YAML = 3
_RE_BAILOUT = re.compile(r'Bail out!\s*(.*)')
_RE_DIRECTIVE = re.compile(r'(?:\s*\#\s*([Ss][Kk][Ii][Pp]\S*|[Tt][Oo][Dd][Oo])\b\s*(.*))?')
_RE_PLAN = re.compile(r'1\.\.([0-9]+)' + _RE_DIRECTIVE.pattern)
_RE_TEST = re.compile(r'((?:not )?ok)\s*(?:([0-9]+)\s*)?([^#]*)' + _RE_DIRECTIVE.pattern)
_RE_VERSION = re.compile(r'TAP version ([0-9]+)')
_RE_YAML_START = re.compile(r'(\s+)---.*')
_RE_YAML_END = re.compile(r'\s+\.\.\.\s*')
found_late_test = False
bailed_out = False
plan: T.Optional[Plan] = None
lineno = 0
num_tests = 0
last_test = 0
highest_test = 0
yaml_lineno: T.Optional[int] = None
yaml_indent = ''
state = _MAIN
version = 12
def parse_test(self, ok: bool, num: int, name: str, directive: T.Optional[str], explanation: T.Optional[str]) -> \
T.Generator[T.Union['TAPParser.Test', 'TAPParser.Error'], None, None]:
name = name.strip()
explanation = explanation.strip() if explanation else None
if directive is not None:
directive = directive.upper()
if directive.startswith('SKIP'):
if ok:
yield self.Test(num, name, TestResult.SKIP, explanation)
return
elif directive == 'TODO':
yield self.Test(num, name, TestResult.UNEXPECTEDPASS if ok else TestResult.EXPECTEDFAIL, explanation)
return
else:
yield self.Error(f'invalid directive "{directive}"')
yield self.Test(num, name, TestResult.OK if ok else TestResult.FAIL, explanation)
async def parse_async(self, lines: T.AsyncIterator[str]) -> T.AsyncIterator[TYPE_TAPResult]:
async for line in lines:
for event in self.parse_line(line):
yield event
for event in self.parse_line(None):
yield event
def parse(self, io: T.Iterator[str]) -> T.Iterator[TYPE_TAPResult]:
for line in io:
yield from self.parse_line(line)
yield from self.parse_line(None)
def parse_line(self, line: T.Optional[str]) -> T.Iterator[TYPE_TAPResult]:
if line is not None:
self.lineno += 1
line = line.rstrip()
# YAML blocks are only accepted after a test
if self.state == self._AFTER_TEST:
if self.version >= 13:
m = self._RE_YAML_START.match(line)
if m:
self.state = self._YAML
self.yaml_lineno = self.lineno
self.yaml_indent = m.group(1)
return
self.state = self._MAIN
elif self.state == self._YAML:
if self._RE_YAML_END.match(line):
self.state = self._MAIN
return
if line.startswith(self.yaml_indent):
return
yield self.Error(f'YAML block not terminated (started on line {self.yaml_lineno})')
self.state = self._MAIN
assert self.state == self._MAIN
if not line or line.startswith('#'):
return
m = self._RE_TEST.match(line)
if m:
if self.plan and self.plan.late and not self.found_late_test:
yield self.Error('unexpected test after late plan')
self.found_late_test = True
self.num_tests += 1
self.last_test = self.last_test + 1 if m.group(2) is None else int(m.group(2))
self.highest_test = max(self.highest_test, self.last_test)
if self.plan and self.last_test > self.plan.num_tests:
yield self.Error('test number exceeds maximum specified in test plan')
yield from self.parse_test(m.group(1) == 'ok', self.last_test,
m.group(3), m.group(4), m.group(5))
self.state = self._AFTER_TEST
return
m = self._RE_PLAN.match(line)
if m:
if self.plan:
yield self.Error('more than one plan found')
else:
num_tests = int(m.group(1))
skipped = num_tests == 0
if m.group(2):
if m.group(2).upper().startswith('SKIP'):
if num_tests > 0:
yield self.Error('invalid SKIP directive for plan')
skipped = True
else:
yield self.Error('invalid directive for plan')
self.plan = self.Plan(num_tests=num_tests, late=(self.num_tests > 0),
skipped=skipped, explanation=m.group(3))
yield self.plan
return
m = self._RE_BAILOUT.match(line)
if m:
yield self.Bailout(m.group(1))
self.bailed_out = True
return
m = self._RE_VERSION.match(line)
if m:
# The TAP version is only accepted as the first line
if self.lineno != 1:
yield self.Error('version number must be on the first line')
return
self.version = int(m.group(1))
if self.version < 13:
yield self.Error('version number should be at least 13')
else:
yield self.Version(version=self.version)
return
# unknown syntax
yield self.UnknownLine(line, self.lineno)
else:
# end of file
if self.state == self._YAML:
yield self.Error(f'YAML block not terminated (started on line {self.yaml_lineno})')
if self.bailed_out:
return
if self.plan and self.num_tests != self.plan.num_tests:
if self.num_tests < self.plan.num_tests:
yield self.Error(f'Too few tests run (expected {self.plan.num_tests}, got {self.num_tests})')
else:
yield self.Error(f'Too many tests run (expected {self.plan.num_tests}, got {self.num_tests})')
return
if self.highest_test != self.num_tests:
if self.highest_test < self.num_tests:
yield self.Error(f'Duplicate test numbers (expected {self.num_tests}, got test numbered {self.highest_test}')
else:
yield self.Error(f'Missing test numbers (expected {self.num_tests}, got test numbered {self.highest_test}')
class TestLogger:
def flush(self) -> None:
pass
def start(self, harness: 'TestHarness') -> None:
pass
def start_test(self, harness: 'TestHarness', test: 'TestRun') -> None:
pass
def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, res: TestResult) -> None:
pass
def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
pass
async def finish(self, harness: 'TestHarness') -> None:
pass
def close(self) -> None:
pass
class TestFileLogger(TestLogger):
def __init__(self, filename: str, errors: str = 'replace') -> None:
self.filename = filename
self.file = open(filename, 'w', encoding='utf-8', errors=errors)
def close(self) -> None:
if self.file:
self.file.close()
self.file = None
class ConsoleLogger(TestLogger):
ASCII_SPINNER = ['..', ':.', '.:']
SPINNER = ["\U0001f311", "\U0001f312", "\U0001f313", "\U0001f314",
"\U0001f315", "\U0001f316", "\U0001f317", "\U0001f318"]
SCISSORS = "\u2700 "
HLINE = "\u2015"
RTRI = "\u25B6 "
def __init__(self, max_lines: int) -> None:
self.max_lines = max_lines
self.running_tests: OrderedSet['TestRun'] = OrderedSet()
self.progress_test: T.Optional['TestRun'] = None
self.progress_task: T.Optional[asyncio.Future] = None
self.max_left_width = 0
self.stop = False
# TODO: before 3.10 this cannot be created immediately, because
# it will create a new event loop
self.update: asyncio.Event
self.should_erase_line = ''
self.test_count = 0
self.started_tests = 0
self.spinner_index = 0
try:
self.cols, _ = os.get_terminal_size(1)
self.is_tty = True
except OSError:
self.cols = 80
self.is_tty = False
self.output_start = dashes(self.SCISSORS, self.HLINE, self.cols - 2)
self.output_end = dashes('', self.HLINE, self.cols - 2)
self.sub = self.RTRI
self.spinner = self.SPINNER
try:
self.output_start.encode(sys.stdout.encoding or 'ascii')
except UnicodeEncodeError:
self.output_start = dashes('8<', '-', self.cols - 2)
self.output_end = dashes('', '-', self.cols - 2)
self.sub = '| '
self.spinner = self.ASCII_SPINNER
def flush(self) -> None:
if self.should_erase_line:
print(self.should_erase_line, end='')
self.should_erase_line = ''
def print_progress(self, line: str) -> None:
print(self.should_erase_line, line, sep='', end='\r')
self.should_erase_line = '\x1b[K'
def request_update(self) -> None:
self.update.set()
def emit_progress(self, harness: 'TestHarness') -> None:
if self.progress_test is None:
self.flush()
return
if len(self.running_tests) == 1:
count = f'{self.started_tests}/{self.test_count}'
else:
count = '{}-{}/{}'.format(self.started_tests - len(self.running_tests) + 1,
self.started_tests, self.test_count)
left = '[{}] {} '.format(count, self.spinner[self.spinner_index])
self.spinner_index = (self.spinner_index + 1) % len(self.spinner)
right = '{spaces} {dur:{durlen}}'.format(
spaces=' ' * TestResult.maxlen(),
dur=int(time.time() - self.progress_test.starttime),
durlen=harness.duration_max_len)
if self.progress_test.timeout:
right += '/{timeout:{durlen}}'.format(
timeout=self.progress_test.timeout,
durlen=harness.duration_max_len)
right += 's'
details = self.progress_test.get_details()
if details:
right += ' ' + details
line = harness.format(self.progress_test, colorize=True,
max_left_width=self.max_left_width,
left=left, right=right)
self.print_progress(line)
def start(self, harness: 'TestHarness') -> None:
async def report_progress() -> None:
loop = asyncio.get_running_loop()
next_update = 0.0
self.request_update()
while not self.stop:
await self.update.wait()
self.update.clear()
# We may get here simply because the progress line has been
# overwritten, so do not always switch. Only do so every
# second, or if the printed test has finished
if loop.time() >= next_update:
self.progress_test = None
next_update = loop.time() + 1
loop.call_at(next_update, self.request_update)
if (self.progress_test and
self.progress_test.res is not TestResult.RUNNING):
self.progress_test = None
if not self.progress_test:
if not self.running_tests:
continue
# Pick a test in round robin order
self.progress_test = self.running_tests.pop(last=False)
self.running_tests.add(self.progress_test)
self.emit_progress(harness)
self.flush()
self.update = asyncio.Event()
self.test_count = harness.test_count
self.cols = max(self.cols, harness.max_left_width + 30)
if self.is_tty and not harness.need_console:
# Account for "[aa-bb/cc] OO " in the progress report
self.max_left_width = 3 * len(str(self.test_count)) + 8
self.progress_task = asyncio.ensure_future(report_progress())
def start_test(self, harness: 'TestHarness', test: 'TestRun') -> None:
if test.verbose and test.cmdline:
self.flush()
print(harness.format(test, mlog.colorize_console(),
max_left_width=self.max_left_width,
right=test.res.get_text(mlog.colorize_console())))
print(test.res.get_command_marker() + test.cmdline)
if test.direct_stdout:
print(self.output_start, flush=True)
elif not test.needs_parsing:
print(flush=True)
self.started_tests += 1
self.running_tests.add(test)
self.running_tests.move_to_end(test, last=False)
self.request_update()
def shorten_log(self, harness: 'TestHarness', result: 'TestRun') -> str:
if not result.verbose and not harness.options.print_errorlogs:
return ''
log = result.get_log(mlog.colorize_console(),
stderr_only=result.needs_parsing)
if result.verbose:
return log
lines = log.splitlines()
if len(lines) < self.max_lines:
return log
else:
return str(mlog.bold(f'Listing only the last {self.max_lines} lines from a long log.\n')) + '\n'.join(lines[-self.max_lines:])
def print_log(self, harness: 'TestHarness', result: 'TestRun') -> None:
if not result.verbose:
cmdline = result.cmdline
if not cmdline:
print(result.res.get_command_marker() + result.stdo)
return
print(result.res.get_command_marker() + cmdline)
log = self.shorten_log(harness, result)
if log:
print(self.output_start)
print_safe(log)
print(self.output_end)
def log_subtest(self, harness: 'TestHarness', test: 'TestRun', s: str, result: TestResult) -> None:
if test.verbose or (harness.options.print_errorlogs and result.is_bad()):
self.flush()
print(harness.format(test, mlog.colorize_console(), max_left_width=self.max_left_width,
prefix=self.sub,
middle=s,
right=result.get_text(mlog.colorize_console())), flush=True)
self.request_update()
def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
self.running_tests.remove(result)
if result.res is TestResult.TIMEOUT and (result.verbose or
harness.options.print_errorlogs):
self.flush()
print(f'{result.name} time out (After {result.timeout} seconds)')
if not harness.options.quiet or not result.res.is_ok():
self.flush()
if result.cmdline and result.direct_stdout:
print(self.output_end)
print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width))
else:
print(harness.format(result, mlog.colorize_console(), max_left_width=self.max_left_width),
flush=True)
if result.verbose or result.res.is_bad():
self.print_log(harness, result)
if result.warnings:
print(flush=True)
for w in result.warnings:
print(w, flush=True)
print(flush=True)
if result.verbose or result.res.is_bad():
print(flush=True)
self.request_update()
async def finish(self, harness: 'TestHarness') -> None:
self.stop = True
self.request_update()
if self.progress_task:
await self.progress_task
if harness.collected_failures and \
(harness.options.print_errorlogs or harness.options.verbose):
print("\nSummary of Failures:\n")
for i, result in enumerate(harness.collected_failures, 1):
print(harness.format(result, mlog.colorize_console()))
print(harness.summary())
class TextLogfileBuilder(TestFileLogger):
def start(self, harness: 'TestHarness') -> None:
self.file.write(f'Log of Meson test suite run on {datetime.datetime.now().isoformat()}\n\n')
inherit_env = env_tuple_to_str(os.environ.items())
self.file.write(f'Inherited environment: {inherit_env}\n\n')
def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
title = f'{result.num}/{harness.test_count}'
self.file.write(dashes(title, '=', 78) + '\n')
self.file.write('test: ' + result.name + '\n')
starttime_str = time.strftime("%H:%M:%S", time.gmtime(result.starttime))
self.file.write('start time: ' + starttime_str + '\n')
self.file.write('duration: ' + '%.2fs' % result.duration + '\n')
self.file.write('result: ' + result.get_exit_status() + '\n')
if result.cmdline:
self.file.write('command: ' + result.cmdline + '\n')
if result.stdo:
name = 'stdout' if harness.options.split else 'output'
self.file.write(dashes(name, '-', 78) + '\n')
self.file.write(result.stdo)
if result.stde:
self.file.write(dashes('stderr', '-', 78) + '\n')
self.file.write(result.stde)
self.file.write(dashes('', '=', 78) + '\n\n')
async def finish(self, harness: 'TestHarness') -> None:
if harness.collected_failures:
self.file.write("\nSummary of Failures:\n\n")
for i, result in enumerate(harness.collected_failures, 1):
self.file.write(harness.format(result, False) + '\n')
self.file.write(harness.summary())
print(f'Full log written to {self.filename}')
class JsonLogfileBuilder(TestFileLogger):
def log(self, harness: 'TestHarness', result: 'TestRun') -> None:
jresult: T.Dict[str, T.Any] = {
'name': result.name,
'stdout': result.stdo,
'result': result.res.value,
'is_fail': result.res.is_bad(),
'starttime': result.starttime,
'duration': result.duration,
'returncode': result.returncode,
'env': result.env,
'command': result.cmd,
}
if result.stde:
jresult['stderr'] = result.stde
self.file.write(json.dumps(jresult) + '\n')
class JunitBuilder(TestLogger):
"""Builder for Junit test results.
Junit is impossible to stream out, it requires attributes counting the
total number of tests, failures, skips, and errors in the root element
and in each test suite. As such, we use a builder class to track each
test case, and calculate all metadata before writing it out.
For tests with multiple results (like from a TAP test), we record the
test as a suite with the project_name.test_name. This allows us to track
each result separately. For tests with only one result (such as exit-code
tests) we record each one into a suite with the name project_name. The use
of the project_name allows us to sort subproject tests separately from
the root project.
"""
def __init__(self, filename: str) -> None:
self.filename = filename
self.root = et.Element(
'testsuites', tests='0', errors='0', failures='0')
self.suites: T.Dict[str, et.Element] = {}
def log(self, harness: 'TestHarness', test: 'TestRun') -> None:
"""Log a single test case."""
if test.junit is not None:
for suite in test.junit.findall('.//testsuite'):
# Assume that we don't need to merge anything here...
suite.attrib['name'] = '{}.{}.{}'.format(test.project, test.name, suite.attrib['name'])
# GTest can inject invalid attributes
for case in suite.findall('.//testcase[@result]'):
del case.attrib['result']
for case in suite.findall('.//testcase[@timestamp]'):
del case.attrib['timestamp']
for case in suite.findall('.//testcase[@file]'):
del case.attrib['file']
for case in suite.findall('.//testcase[@line]'):
del case.attrib['line']
self.root.append(suite)
return
# In this case we have a test binary with multiple results.
# We want to record this so that each result is recorded
# separately
if test.results:
suitename = f'{test.project}.{test.name}'
assert suitename not in self.suites or harness.options.repeat > 1, 'duplicate suite'
suite = self.suites[suitename] = et.Element(
'testsuite',
name=suitename,
tests=str(len(test.results)),
errors=str(sum(1 for r in test.results if r.result in
{TestResult.INTERRUPT, TestResult.ERROR})),
failures=str(sum(1 for r in test.results if r.result in
{TestResult.FAIL, TestResult.UNEXPECTEDPASS, TestResult.TIMEOUT})),
skipped=str(sum(1 for r in test.results if r.result in
{TestResult.SKIP, TestResult.IGNORED})),
time=str(test.duration),
)
for subtest in test.results:
# Both name and classname are required. Use the suite name as
# the class name, so that e.g. GitLab groups testcases correctly.
testcase = et.SubElement(suite, 'testcase', name=str(subtest), classname=suitename)
if subtest.result is TestResult.SKIP:
et.SubElement(testcase, 'skipped')
elif subtest.result is TestResult.IGNORED:
skip = et.SubElement(testcase, 'skipped')
skip.text = 'Test output was not parsed.'
elif subtest.result is TestResult.ERROR:
et.SubElement(testcase, 'error')
elif subtest.result is TestResult.FAIL:
et.SubElement(testcase, 'failure')
elif subtest.result is TestResult.UNEXPECTEDPASS:
fail = et.SubElement(testcase, 'failure')
fail.text = 'Test unexpectedly passed.'
elif subtest.result is TestResult.INTERRUPT:
fail = et.SubElement(testcase, 'error')
fail.text = 'Test was interrupted by user.'
elif subtest.result is TestResult.TIMEOUT:
fail = et.SubElement(testcase, 'error')
fail.text = 'Test did not finish before configured timeout.'
if subtest.explanation:
et.SubElement(testcase, 'system-out').text = subtest.explanation
if test.stdo:
out = et.SubElement(suite, 'system-out')
out.text = replace_unencodable_xml_chars(test.stdo.rstrip())
if test.stde:
err = et.SubElement(suite, 'system-err')
err.text = replace_unencodable_xml_chars(test.stde.rstrip())
else:
if test.project not in self.suites:
suite = self.suites[test.project] = et.Element(
'testsuite', name=test.project, tests='1', errors='0',
failures='0', skipped='0', time=str(test.duration))
else:
suite = self.suites[test.project]
suite.attrib['tests'] = str(int(suite.attrib['tests']) + 1)
testcase = et.SubElement(suite, 'testcase', name=test.name,
classname=test.project, time=str(test.duration))
if test.res is TestResult.SKIP:
et.SubElement(testcase, 'skipped')
suite.attrib['skipped'] = str(int(suite.attrib['skipped']) + 1)
elif test.res is TestResult.IGNORED:
skip = et.SubElement(testcase, 'skipped')
skip.text = 'Test output was not parsed.'
suite.attrib['skipped'] = str(int(suite.attrib['skipped']) + 1)
elif test.res is TestResult.ERROR:
et.SubElement(testcase, 'error')
suite.attrib['errors'] = str(int(suite.attrib['errors']) + 1)
elif test.res is TestResult.FAIL:
et.SubElement(testcase, 'failure')
suite.attrib['failures'] = str(int(suite.attrib['failures']) + 1)
elif test.res is TestResult.UNEXPECTEDPASS:
fail = et.SubElement(testcase, 'failure')
fail.text = 'Test unexpectedly passed.'
suite.attrib['failures'] = str(int(suite.attrib['failures']) + 1)
elif test.res is TestResult.INTERRUPT:
fail = et.SubElement(testcase, 'error')
fail.text = 'Test was interrupted by user.'
suite.attrib['errors'] = str(int(suite.attrib['errors']) + 1)
elif test.res is TestResult.TIMEOUT:
fail = et.SubElement(testcase, 'error')
fail.text = 'Test did not finish before configured timeout.'
suite.attrib['errors'] = str(int(suite.attrib['errors']) + 1)
if test.stdo:
out = et.SubElement(testcase, 'system-out')
out.text = replace_unencodable_xml_chars(test.stdo.rstrip())
if test.stde:
err = et.SubElement(testcase, 'system-err')
err.text = replace_unencodable_xml_chars(test.stde.rstrip())
async def finish(self, harness: 'TestHarness') -> None:
"""Calculate total test counts and write out the xml result."""
for suite in self.suites.values():
self.root.append(suite)
# Skipped is really not allowed in the "testsuits" element
for attr in ['tests', 'errors', 'failures']:
self.root.attrib[attr] = str(int(self.root.attrib[attr]) + int(suite.attrib[attr]))
tree = et.ElementTree(self.root)
with open(self.filename, 'wb') as f:
tree.write(f, encoding='utf-8', xml_declaration=True)
class TestRun:
TEST_NUM = 0
PROTOCOL_TO_CLASS: T.Dict[TestProtocol, T.Type['TestRun']] = {}
def __new__(cls, test: TestSerialisation, *args: T.Any, **kwargs: T.Any) -> T.Any:
return super().__new__(TestRun.PROTOCOL_TO_CLASS[test.protocol])
def __init__(self, test: TestSerialisation, test_env: T.Dict[str, str],
name: str, timeout: T.Optional[int], is_parallel: bool, verbose: bool,
interactive: bool):
self.res = TestResult.PENDING
self.test = test
self._num: T.Optional[int] = None
self.name = name
self.timeout = timeout
self.results: T.List[TAPParser.Test] = []
self.returncode: T.Optional[int] = None
self.starttime: T.Optional[float] = None
self.duration: T.Optional[float] = None
self.stdo = ''
self.stde = ''
self.additional_error = ''
self.cmd: T.Optional[T.List[str]] = None
self.env = test_env
self.should_fail = test.should_fail
self.project = test.project_name
self.junit: T.Optional[et.ElementTree] = None
self.is_parallel = is_parallel
self.verbose = verbose
self.interactive = interactive
self.warnings: T.List[str] = []
def start(self, cmd: T.List[str]) -> None:
self.res = TestResult.RUNNING
self.starttime = time.time()
self.cmd = cmd
@property
def num(self) -> int:
if self._num is None:
TestRun.TEST_NUM += 1
self._num = TestRun.TEST_NUM
return self._num
@property
def console_mode(self) -> ConsoleUser:
if self.interactive:
return ConsoleUser.INTERACTIVE
elif self.direct_stdout:
return ConsoleUser.STDOUT
else:
return ConsoleUser.LOGGER
@property
def direct_stdout(self) -> bool:
return self.verbose and not self.is_parallel and not self.needs_parsing
def get_results(self) -> str:
if self.results:
# running or succeeded
passed = sum(x.result.is_ok() for x in self.results)
ran = sum(x.result not in {TestResult.SKIP, TestResult.IGNORED} for x in self.results)
if passed == ran:
return f'{passed} subtests passed'
else:
return f'{passed}/{ran} subtests passed'
return ''
def get_exit_status(self) -> str:
return returncode_to_status(self.returncode)
def get_details(self) -> str:
if self.res is TestResult.PENDING:
return ''
if self.returncode:
return self.get_exit_status()
return self.get_results()
def _complete(self) -> None:
if self.res == TestResult.RUNNING:
self.res = TestResult.OK
if self.needs_parsing and self.console_mode is ConsoleUser.INTERACTIVE:
self.res = TestResult.IGNORED
assert isinstance(self.res, TestResult)
if self.should_fail and self.res in (TestResult.OK, TestResult.FAIL):
self.res = TestResult.UNEXPECTEDPASS if self.res is TestResult.OK else TestResult.EXPECTEDFAIL
if self.stdo and not self.stdo.endswith('\n'):
self.stdo += '\n'
if self.stde and not self.stde.endswith('\n'):
self.stde += '\n'
self.duration = time.time() - self.starttime
@property
def cmdline(self) -> T.Optional[str]:
if not self.cmd:
return None
test_only_env = set(self.env.items()) - set(os.environ.items())
return env_tuple_to_str(test_only_env) + \
' '.join(sh_quote(x) for x in self.cmd)
def complete_skip(self) -> None:
self.starttime = time.time()
self.returncode = GNU_SKIP_RETURNCODE
self.res = TestResult.SKIP
self._complete()
def complete(self) -> None:
self._complete()
def get_log(self, colorize: bool = False, stderr_only: bool = False) -> str:
stdo = '' if stderr_only else self.stdo
if self.stde or self.additional_error:
res = ''
if stdo:
res += mlog.cyan('stdout:').get_text(colorize) + '\n'
res += stdo
if res[-1:] != '\n':
res += '\n'
res += mlog.cyan('stderr:').get_text(colorize) + '\n'
res += join_lines(self.stde, self.additional_error)
else:
res = stdo
if res and res[-1:] != '\n':
res += '\n'
return res
@property
def needs_parsing(self) -> bool:
return False
async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> None:
async for l in lines:
pass
class TestRunExitCode(TestRun):
def complete(self) -> None:
if self.res != TestResult.RUNNING:
pass
elif self.returncode == GNU_SKIP_RETURNCODE:
self.res = TestResult.SKIP
elif self.returncode == GNU_ERROR_RETURNCODE:
self.res = TestResult.ERROR
else:
self.res = TestResult.FAIL if bool(self.returncode) else TestResult.OK
super().complete()
TestRun.PROTOCOL_TO_CLASS[TestProtocol.EXITCODE] = TestRunExitCode
class TestRunGTest(TestRunExitCode):
def complete(self) -> None:
filename = f'{self.test.name}.xml'
if self.test.workdir:
filename = os.path.join(self.test.workdir, filename)
try:
with open(filename, 'r', encoding='utf8', errors='replace') as f:
self.junit = et.parse(f)
except FileNotFoundError:
# This can happen if the test fails to run or complete for some
# reason, like the rpath for libgtest isn't properly set. ExitCode
# will handle the failure, don't generate a stacktrace.
pass
except et.ParseError as e:
# ExitCode will handle the failure, don't generate a stacktrace.
mlog.error(f'Unable to parse {filename}: {e!s}')
super().complete()
TestRun.PROTOCOL_TO_CLASS[TestProtocol.GTEST] = TestRunGTest
class TestRunTAP(TestRun):
@property
def needs_parsing(self) -> bool:
return True
def complete(self) -> None:
if self.returncode != 0 and not self.res.was_killed():
self.res = TestResult.ERROR
self.stde = self.stde or ''
self.stde += f'\n(test program exited with status code {self.returncode})'
super().complete()
async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> None:
res = None
warnings: T.List[TAPParser.UnknownLine] = []
version = 12
async for i in TAPParser().parse_async(lines):
if isinstance(i, TAPParser.Version):
version = i.version
elif isinstance(i, TAPParser.Bailout):
res = TestResult.ERROR
harness.log_subtest(self, i.message, res)
elif isinstance(i, TAPParser.Test):
self.results.append(i)
if i.result.is_bad():
res = TestResult.FAIL
harness.log_subtest(self, i.name or f'subtest {i.number}', i.result)
elif isinstance(i, TAPParser.UnknownLine):
warnings.append(i)
elif isinstance(i, TAPParser.Error):
self.additional_error += 'TAP parsing error: ' + i.message
res = TestResult.ERROR
if warnings:
unknown = str(mlog.yellow('UNKNOWN'))
width = len(str(max(i.lineno for i in warnings)))
for w in warnings:
self.warnings.append(f'stdout: {w.lineno:{width}}: {unknown}: {w.message}')
if version > 13:
self.warnings.append('Unknown TAP output lines have been ignored. Please open a feature request to\n'
'implement them, or prefix them with a # if they are not TAP syntax.')
else:
self.warnings.append(str(mlog.red('ERROR')) + ': Unknown TAP output lines for a supported TAP version.\n'
'This is probably a bug in the test; if they are not TAP syntax, prefix them with a #')
if all(t.result is TestResult.SKIP for t in self.results):
# This includes the case where self.results is empty
if res != TestResult.ERROR:
res = TestResult.SKIP
if res and self.res == TestResult.RUNNING:
self.res = res
TestRun.PROTOCOL_TO_CLASS[TestProtocol.TAP] = TestRunTAP
class TestRunRust(TestRun):
@property
def needs_parsing(self) -> bool:
return True
async def parse(self, harness: 'TestHarness', lines: T.AsyncIterator[str]) -> None:
def parse_res(n: int, name: str, result: str) -> TAPParser.Test:
if result == 'ok':
return TAPParser.Test(n, name, TestResult.OK, None)
elif result == 'ignored':
return TAPParser.Test(n, name, TestResult.SKIP, None)
elif result == 'FAILED':
return TAPParser.Test(n, name, TestResult.FAIL, None)
return TAPParser.Test(n, name, TestResult.ERROR,
f'Unsupported output from rust test: {result}')
n = 1
async for line in lines:
match = RUST_TEST_RE.match(line)
if match:
name, result = match.groups()
doctest = RUST_DOCTEST_RE.match(name)
if doctest:
name = ':'.join((x.rstrip() for x in doctest.groups() if x))
else:
name = name.rstrip()
name = name.replace('::', '.')
t = parse_res(n, name, result)
self.results.append(t)
harness.log_subtest(self, name, t.result)
n += 1
res = None
if all(t.result is TestResult.SKIP for t in self.results):
# This includes the case where self.results is empty
res = TestResult.SKIP
elif any(t.result is TestResult.ERROR for t in self.results):
res = TestResult.ERROR
elif any(t.result is TestResult.FAIL for t in self.results):
res = TestResult.FAIL
if res and self.res == TestResult.RUNNING:
self.res = res
TestRun.PROTOCOL_TO_CLASS[TestProtocol.RUST] = TestRunRust
# Check unencodable characters in xml output and replace them with
# their printable representation
def replace_unencodable_xml_chars(original_str: str) -> str:
# [1:-1] is needed for removing `'` characters from both start and end
# of the string
replacement_lambda = lambda illegal_chr: repr(illegal_chr.group())[1:-1]
return UNENCODABLE_XML_CHRS_RE.sub(replacement_lambda, original_str)
def decode(stream: T.Union[None, bytes]) -> str:
if stream is None:
return ''
try:
return stream.decode('utf-8')
except UnicodeDecodeError:
return stream.decode('iso-8859-1', errors='ignore')
async def read_decode(reader: asyncio.StreamReader,
queue: T.Optional['asyncio.Queue[T.Optional[str]]'],
console_mode: ConsoleUser) -> str:
stdo_lines = []
try:
while not reader.at_eof():
# Prefer splitting by line, as that produces nicer output
try:
line_bytes = await reader.readuntil(b'\n')
except asyncio.IncompleteReadError as e:
line_bytes = e.partial
except asyncio.LimitOverrunError as e:
line_bytes = await reader.readexactly(e.consumed)
if line_bytes:
line = decode(line_bytes).replace('\r\n', '\n')
stdo_lines.append(line)
if console_mode is ConsoleUser.STDOUT:
print(line, end='', flush=True)
if queue:
await queue.put(line)
return ''.join(stdo_lines)
except asyncio.CancelledError:
return ''.join(stdo_lines)
finally:
if queue:
await queue.put(None)
def run_with_mono(fname: str) -> bool:
return fname.endswith('.exe') and not (is_windows() or is_cygwin())
def check_testdata(objs: T.List[TestSerialisation]) -> T.List[TestSerialisation]:
if not isinstance(objs, list):
raise MesonVersionMismatchException('<unknown>', coredata_version)
for obj in objs:
if not isinstance(obj, TestSerialisation):
raise MesonVersionMismatchException('<unknown>', coredata_version)
if not hasattr(obj, 'version'):
raise MesonVersionMismatchException('<unknown>', coredata_version)
if major_versions_differ(obj.version, coredata_version):
raise MesonVersionMismatchException(obj.version, coredata_version)
return objs
# Custom waiting primitives for asyncio
async def queue_iter(q: 'asyncio.Queue[T.Optional[str]]') -> T.AsyncIterator[str]:
while True:
item = await q.get()
q.task_done()
if item is None:
break
yield item
async def complete(future: asyncio.Future) -> None:
"""Wait for completion of the given future, ignoring cancellation."""
try:
await future
except asyncio.CancelledError:
pass
async def complete_all(futures: T.Iterable[asyncio.Future],
timeout: T.Optional[T.Union[int, float]] = None) -> None:
"""Wait for completion of all the given futures, ignoring cancellation.
If timeout is not None, raise an asyncio.TimeoutError after the given
time has passed. asyncio.TimeoutError is only raised if some futures
have not completed and none have raised exceptions, even if timeout
is zero."""
def check_futures(futures: T.Iterable[asyncio.Future]) -> None:
# Raise exceptions if needed
left = False
for f in futures:
if not f.done():
left = True
elif not f.cancelled():
f.result()
if left:
raise asyncio.TimeoutError
# Python is silly and does not have a variant of asyncio.wait with an
# absolute time as deadline.
loop = asyncio.get_running_loop()
deadline = None if timeout is None else loop.time() + timeout
while futures and (timeout is None or timeout > 0):
done, futures = await asyncio.wait(futures, timeout=timeout,
return_when=asyncio.FIRST_EXCEPTION)
check_futures(done)
if deadline:
timeout = deadline - loop.time()
check_futures(futures)
class TestSubprocess:
def __init__(self, p: asyncio.subprocess.Process,
stdout: T.Optional[int], stderr: T.Optional[int],
postwait_fn: T.Callable[[], None] = None):
self._process = p
self.stdout = stdout
self.stderr = stderr
self.stdo_task: T.Optional[asyncio.Task[None]] = None
self.stde_task: T.Optional[asyncio.Task[None]] = None
self.postwait_fn = postwait_fn
self.all_futures: T.List[asyncio.Future] = []
self.queue: T.Optional[asyncio.Queue[T.Optional[str]]] = None
def stdout_lines(self) -> T.AsyncIterator[str]:
self.queue = asyncio.Queue()
return queue_iter(self.queue)
def communicate(self,
test: 'TestRun',
console_mode: ConsoleUser) -> T.Tuple[T.Optional[T.Awaitable[str]],
T.Optional[T.Awaitable[str]]]:
async def collect_stdo(test: 'TestRun',
reader: asyncio.StreamReader,
console_mode: ConsoleUser) -> None:
test.stdo = await read_decode(reader, self.queue, console_mode)
async def collect_stde(test: 'TestRun',
reader: asyncio.StreamReader,
console_mode: ConsoleUser) -> None:
test.stde = await read_decode(reader, None, console_mode)
# asyncio.ensure_future ensures that printing can
# run in the background, even before it is awaited
if self.stdo_task is None and self.stdout is not None:
decode_coro = collect_stdo(test, self._process.stdout, console_mode)
self.stdo_task = asyncio.ensure_future(decode_coro)
self.all_futures.append(self.stdo_task)
if self.stderr is not None and self.stderr != asyncio.subprocess.STDOUT:
decode_coro = collect_stde(test, self._process.stderr, console_mode)
self.stde_task = asyncio.ensure_future(decode_coro)
self.all_futures.append(self.stde_task)
return self.stdo_task, self.stde_task
async def _kill(self) -> T.Optional[str]:
# Python does not provide multiplatform support for
# killing a process and all its children so we need
# to roll our own.
p = self._process
try:
if is_windows():
subprocess.run(['taskkill', '/F', '/T', '/PID', str(p.pid)])
else:
# Send a termination signal to the process group that setsid()
# created - giving it a chance to perform any cleanup.
os.killpg(p.pid, signal.SIGTERM)
# Make sure the termination signal actually kills the process
# group, otherwise retry with a SIGKILL.
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(p.wait(), timeout=0.5)
if p.returncode is not None:
return None
os.killpg(p.pid, signal.SIGKILL)
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(p.wait(), timeout=1)
if p.returncode is not None:
return None
# An earlier kill attempt has not worked for whatever reason.
# Try to kill it one last time with a direct call.
# If the process has spawned children, they will remain around.
p.kill()
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(p.wait(), timeout=1)
if p.returncode is not None:
return None
return 'Test process could not be killed.'
except ProcessLookupError:
# Sometimes (e.g. with Wine) this happens. There's nothing
# we can do, probably the process already died so just wait
# for the event loop to pick that up.
await p.wait()
return None
finally:
if self.stdo_task:
self.stdo_task.cancel()
if self.stde_task:
self.stde_task.cancel()
async def wait(self, test: 'TestRun') -> None:
p = self._process
self.all_futures.append(asyncio.ensure_future(p.wait()))
try:
await complete_all(self.all_futures, timeout=test.timeout)
except asyncio.TimeoutError:
test.additional_error += await self._kill() or ''
test.res = TestResult.TIMEOUT
except asyncio.CancelledError:
# The main loop must have seen Ctrl-C.
test.additional_error += await self._kill() or ''
test.res = TestResult.INTERRUPT
finally:
if self.postwait_fn:
self.postwait_fn()
test.returncode = p.returncode or 0
class SingleTestRunner:
def __init__(self, test: TestSerialisation, env: T.Dict[str, str], name: str,
options: argparse.Namespace):
self.test = test
self.options = options
self.cmd = self._get_cmd()
if self.cmd and self.test.extra_paths:
env['PATH'] = os.pathsep.join(self.test.extra_paths + ['']) + env['PATH']
winecmd = []
for c in self.cmd:
winecmd.append(c)
if os.path.basename(c).startswith('wine'):
env['WINEPATH'] = get_wine_shortpath(
winecmd,
['Z:' + p for p in self.test.extra_paths] + env.get('WINEPATH', '').split(';'),
self.test.workdir
)
break
# If MALLOC_PERTURB_ is not set, or if it is set to an empty value,
# (i.e., the test or the environment don't explicitly set it), set
# it ourselves. We do this unconditionally for regular tests
# because it is extremely useful to have.
# Setting MALLOC_PERTURB_="0" will completely disable this feature.
if ('MALLOC_PERTURB_' not in env or not env['MALLOC_PERTURB_']) and not options.benchmark:
env['MALLOC_PERTURB_'] = str(random.randint(1, 255))
# Sanitizers do not default to aborting on error. This is counter to
# expectations when using -Db_sanitize and has led to confusion in the wild
# in CI. Set our own values of {ASAN,UBSAN}_OPTIONS to rectify this, but
# only if the user has not defined them.
if ('ASAN_OPTIONS' not in env or not env['ASAN_OPTIONS']):
env['ASAN_OPTIONS'] = 'halt_on_error=1:abort_on_error=1:print_summary=1'
if ('UBSAN_OPTIONS' not in env or not env['UBSAN_OPTIONS']):
env['UBSAN_OPTIONS'] = 'halt_on_error=1:abort_on_error=1:print_summary=1:print_stacktrace=1'
if ('MSAN_OPTIONS' not in env or not env['MSAN_OPTIONS']):
env['MSAN_OPTIONS'] = 'halt_on_error=1:abort_on_error=1:print_summary=1:print_stacktrace=1'
# Valgrind also doesn't reflect errors in its exit code by default.
if 'VALGRIND_OPTS' not in env or not env['VALGRIND_OPTS']:
try:
wrapper_name = TestHarness.get_wrapper(self.options)[0]
if 'valgrind' in wrapper_name:
env['VALGRIND_OPTS'] = '--error-exitcode=1'
except IndexError:
pass
if self.options.interactive or self.test.timeout is None or self.test.timeout <= 0:
timeout = None
elif self.options.timeout_multiplier is None:
timeout = self.test.timeout
elif self.options.timeout_multiplier <= 0:
timeout = None
else:
timeout = self.test.timeout * self.options.timeout_multiplier
is_parallel = test.is_parallel and self.options.num_processes > 1 and not self.options.interactive
verbose = (test.verbose or self.options.verbose) and not self.options.quiet
self.runobj = TestRun(test, env, name, timeout, is_parallel, verbose, self.options.interactive)
@property
def console_mode(self) -> ConsoleUser:
return self.runobj.console_mode
def _get_test_cmd(self) -> T.Optional[T.List[str]]:
testentry = self.test.fname[0]
if self.options.no_rebuild and self.test.cmd_is_built and not os.path.isfile(testentry):
raise TestException(f'The test program {testentry!r} does not exist. Cannot run tests before building them.')
if testentry.endswith('.jar'):
return ['java', '-jar'] + self.test.fname
elif not self.test.is_cross_built and run_with_mono(testentry):
return ['mono'] + self.test.fname
elif self.test.cmd_is_exe and self.test.is_cross_built and self.test.needs_exe_wrapper:
if self.test.exe_wrapper is None:
# Can not run test on cross compiled executable
# because there is no execute wrapper.
return None
elif self.test.cmd_is_exe:
# If the command is not built (ie, its a python script),
# then we don't check for the exe-wrapper
if not self.test.exe_wrapper.found():
msg = ('The exe_wrapper defined in the cross file {!r} was not '
'found. Please check the command and/or add it to PATH.')
raise TestException(msg.format(self.test.exe_wrapper.name))
return self.test.exe_wrapper.get_command() + self.test.fname
elif self.test.cmd_is_built and not self.test.cmd_is_exe and is_windows():
test_cmd = ExternalProgram._shebang_to_cmd(self.test.fname[0])
if test_cmd is not None:
test_cmd += self.test.fname[1:]
return test_cmd
return self.test.fname
def _get_cmd(self) -> T.Optional[T.List[str]]:
test_cmd = self._get_test_cmd()
if not test_cmd:
return None
return TestHarness.get_wrapper(self.options) + test_cmd
@property
def is_parallel(self) -> bool:
return self.runobj.is_parallel
@property
def visible_name(self) -> str:
return self.runobj.name
@property
def timeout(self) -> T.Optional[int]:
return self.runobj.timeout
async def run(self, harness: 'TestHarness') -> TestRun:
if self.cmd is None:
self.stdo = 'Not run because cannot execute cross compiled binaries.'
harness.log_start_test(self.runobj)
self.runobj.complete_skip()
else:
cmd = self.cmd + self.test.cmd_args + self.options.test_args
self.runobj.start(cmd)
harness.log_start_test(self.runobj)
await self._run_cmd(harness, cmd)
return self.runobj
async def _run_subprocess(self, args: T.List[str], *, stdin: T.Optional[int],
stdout: T.Optional[int], stderr: T.Optional[int],
env: T.Dict[str, str], cwd: T.Optional[str]) -> TestSubprocess:
# Let gdb handle ^C instead of us
if self.options.interactive:
previous_sigint_handler = signal.getsignal(signal.SIGINT)
# Make the meson executable ignore SIGINT while gdb is running.
signal.signal(signal.SIGINT, signal.SIG_IGN)
def preexec_fn() -> None:
if self.options.interactive:
# Restore the SIGINT handler for the child process to
# ensure it can handle it.
signal.signal(signal.SIGINT, signal.SIG_DFL)
else:
# We don't want setsid() in gdb because gdb needs the
# terminal in order to handle ^C and not show tcsetpgrp()
# errors avoid not being able to use the terminal.
os.setsid()
def postwait_fn() -> None:
if self.options.interactive:
# Let us accept ^C again
signal.signal(signal.SIGINT, previous_sigint_handler)
p = await asyncio.create_subprocess_exec(*args,
stdin=stdin,
stdout=stdout,
stderr=stderr,
env=env,
cwd=cwd,
preexec_fn=preexec_fn if not is_windows() else None)
return TestSubprocess(p, stdout=stdout, stderr=stderr,
postwait_fn=postwait_fn if not is_windows() else None)
async def _run_cmd(self, harness: 'TestHarness', cmd: T.List[str]) -> None:
if self.console_mode is ConsoleUser.INTERACTIVE:
stdin = None
stdout = None
stderr = None
else:
stdin = asyncio.subprocess.DEVNULL
stdout = asyncio.subprocess.PIPE
stderr = asyncio.subprocess.STDOUT \
if not self.options.split and not self.runobj.needs_parsing \
else asyncio.subprocess.PIPE
extra_cmd: T.List[str] = []
if self.test.protocol is TestProtocol.GTEST:
gtestname = self.test.name
if self.test.workdir:
gtestname = os.path.join(self.test.workdir, self.test.name)
extra_cmd.append(f'--gtest_output=xml:{gtestname}.xml')
p = await self._run_subprocess(cmd + extra_cmd,
stdin=stdin,
stdout=stdout,
stderr=stderr,
env=self.runobj.env,
cwd=self.test.workdir)
if self.runobj.needs_parsing and self.console_mode is not ConsoleUser.INTERACTIVE:
parse_coro = self.runobj.parse(harness, p.stdout_lines())
parse_task = asyncio.ensure_future(parse_coro)
else:
parse_task = None
stdo_task, stde_task = p.communicate(self.runobj, self.console_mode)
await p.wait(self.runobj)
if parse_task:
await parse_task
if stdo_task:
await stdo_task
if stde_task:
await stde_task
self.runobj.complete()
class TestHarness:
def __init__(self, options: argparse.Namespace):
self.options = options
self.collected_failures: T.List[TestRun] = []
self.fail_count = 0
self.expectedfail_count = 0
self.unexpectedpass_count = 0
self.success_count = 0
self.skip_count = 0
self.ignored_count = 0
self.timeout_count = 0
self.test_count = 0
self.name_max_len = 0
self.is_run = False
self.loggers: T.List[TestLogger] = []
self.console_logger = ConsoleLogger(options.max_lines)
self.loggers.append(self.console_logger)
self.need_console = False
self.ninja: T.List[str] = None
self.logfile_base: T.Optional[str] = None
if self.options.logbase and not self.options.interactive:
namebase = None
self.logfile_base = os.path.join(self.options.wd, 'meson-logs', self.options.logbase)
if self.options.wrapper:
namebase = os.path.basename(self.get_wrapper(self.options)[0])
elif self.options.setup:
namebase = self.options.setup.replace(":", "_")
if namebase:
self.logfile_base += '-' + namebase.replace(' ', '_')
self.prepare_build()
self.load_metadata()
ss = set()
for t in self.tests:
for s in t.suite:
ss.add(s)
self.suites = list(ss)
def get_console_logger(self) -> 'ConsoleLogger':
assert self.console_logger
return self.console_logger
def prepare_build(self) -> None:
if self.options.no_rebuild:
return
self.ninja = tooldetect.detect_ninja()
if not self.ninja:
print("Can't find ninja, can't rebuild test.")
# If ninja can't be found return exit code 127, indicating command
# not found for shell, which seems appropriate here. This works
# nicely for `git bisect run`, telling it to abort - no point in
# continuing if there's no ninja.
sys.exit(127)
def load_metadata(self) -> None:
startdir = os.getcwd()
try:
os.chdir(self.options.wd)
# Before loading build / test data, make sure that the build
# configuration does not need to be regenerated. This needs to
# happen before rebuild_deps(), because we need the correct list of
# tests and their dependencies to compute
if not self.options.no_rebuild:
teststdo = subprocess.run(self.ninja + ['-n', 'build.ninja'], capture_output=True).stdout
if b'ninja: no work to do.' not in teststdo and b'samu: nothing to do' not in teststdo:
stdo = sys.stderr if self.options.list else sys.stdout
ret = subprocess.run(self.ninja + ['build.ninja'], stdout=stdo.fileno())
if ret.returncode != 0:
raise TestException(f'Could not configure {self.options.wd!r}')
self.build_data = build.load(os.getcwd())
if not self.options.setup:
self.options.setup = self.build_data.test_setup_default_name
if self.options.benchmark:
self.tests = self.load_tests('meson_benchmark_setup.dat')
else:
self.tests = self.load_tests('meson_test_setup.dat')
finally:
os.chdir(startdir)
def load_tests(self, file_name: str) -> T.List[TestSerialisation]:
datafile = Path('meson-private') / file_name
if not datafile.is_file():
raise TestException(f'Directory {self.options.wd!r} does not seem to be a Meson build directory.')
with datafile.open('rb') as f:
objs = check_testdata(pickle.load(f))
return objs
def __enter__(self) -> 'TestHarness':
return self
def __exit__(self, exc_type: T.Any, exc_value: T.Any, traceback: T.Any) -> None:
self.close_logfiles()
def close_logfiles(self) -> None:
for l in self.loggers:
l.close()
self.console_logger = None
def get_test_setup(self, test: T.Optional[TestSerialisation]) -> build.TestSetup:
if ':' in self.options.setup:
if self.options.setup not in self.build_data.test_setups:
sys.exit(f"Unknown test setup '{self.options.setup}'.")
return self.build_data.test_setups[self.options.setup]
else:
full_name = test.project_name + ":" + self.options.setup
if full_name not in self.build_data.test_setups:
sys.exit(f"Test setup '{self.options.setup}' not found from project '{test.project_name}'.")
return self.build_data.test_setups[full_name]
def merge_setup_options(self, options: argparse.Namespace, test: TestSerialisation) -> T.Dict[str, str]:
current = self.get_test_setup(test)
if not options.gdb:
options.gdb = current.gdb
if options.gdb:
options.interactive = True
options.verbose = True
if options.timeout_multiplier is None:
options.timeout_multiplier = current.timeout_multiplier
# if options.env is None:
# options.env = current.env # FIXME, should probably merge options here.
if options.wrapper is None:
options.wrapper = current.exe_wrapper
elif current.exe_wrapper:
sys.exit('Conflict: both test setup and command line specify an exe wrapper.')
return current.env.get_env(os.environ.copy())
def get_test_runner(self, test: TestSerialisation, iteration: int) -> SingleTestRunner:
name = self.get_pretty_suite(test)
options = deepcopy(self.options)
if self.options.setup:
env = self.merge_setup_options(options, test)
else:
env = os.environ.copy()
test_env = test.env.get_env(env)
env.update(test_env)
if (test.is_cross_built and test.needs_exe_wrapper and
test.exe_wrapper and test.exe_wrapper.found()):
env['MESON_EXE_WRAPPER'] = join_args(test.exe_wrapper.get_command())
env['MESON_TEST_ITERATION'] = str(iteration + 1)
return SingleTestRunner(test, env, name, options)
def process_test_result(self, result: TestRun) -> None:
if result.res is TestResult.TIMEOUT:
self.timeout_count += 1
elif result.res is TestResult.SKIP:
self.skip_count += 1
elif result.res is TestResult.IGNORED:
self.ignored_count += 1
elif result.res is TestResult.OK:
self.success_count += 1
elif result.res in {TestResult.FAIL, TestResult.ERROR, TestResult.INTERRUPT}:
self.fail_count += 1
elif result.res is TestResult.EXPECTEDFAIL:
self.expectedfail_count += 1
elif result.res is TestResult.UNEXPECTEDPASS:
self.unexpectedpass_count += 1
else:
sys.exit(f'Unknown test result encountered: {result.res}')
if result.res.is_bad():
self.collected_failures.append(result)
for l in self.loggers:
l.log(self, result)
@property
def numlen(self) -> int:
return len(str(self.test_count))
@property
def max_left_width(self) -> int:
return 2 * self.numlen + 2
def get_test_num_prefix(self, num: int) -> str:
return '{num:{numlen}}/{testcount} '.format(numlen=self.numlen,
num=num,
testcount=self.test_count)
def format(self, result: TestRun, colorize: bool,
max_left_width: int = 0,
prefix: str = '',
left: T.Optional[str] = None,
middle: T.Optional[str] = None,
right: T.Optional[str] = None) -> str:
if left is None:
left = self.get_test_num_prefix(result.num)
# A non-default max_left_width lets the logger print more stuff before the
# name, while ensuring that the rightmost columns remain aligned.
max_left_width = max(max_left_width, self.max_left_width)
if middle is None:
middle = result.name
extra_mid_width = max_left_width + self.name_max_len + 1 - uniwidth(middle) - uniwidth(left) - uniwidth(prefix)
middle += ' ' * max(1, extra_mid_width)
if right is None:
right = '{res} {dur:{durlen}.2f}s'.format(
res=result.res.get_text(colorize),
dur=result.duration,
durlen=self.duration_max_len + 3)
details = result.get_details()
if details:
right += ' ' + details
return prefix + left + middle + right
def summary(self) -> str:
results = {
'Ok: ': self.success_count,
'Expected Fail: ': self.expectedfail_count,
'Fail: ': self.fail_count,
'Unexpected Pass: ': self.unexpectedpass_count,
'Skipped: ': self.skip_count,
'Ignored: ': self.ignored_count,
'Timeout: ': self.timeout_count,
}
summary = []
for result, count in results.items():
if count > 0 or result.startswith('Ok:') or result.startswith('Fail:'):
summary.append(result + '{:<4}'.format(count))
return '\n{}\n'.format('\n'.join(summary))
def total_failure_count(self) -> int:
return self.fail_count + self.unexpectedpass_count + self.timeout_count
def doit(self) -> int:
if self.is_run:
raise RuntimeError('Test harness object can only be used once.')
self.is_run = True
tests = self.get_tests()
# NOTE: If all tests are selected anyway, we pass
# an empty list to `rebuild_deps`, which then will execute
# the "meson-test-prereq" ninja target as a fallback.
# This prevents situations, where ARG_MAX may overflow
# if there are many targets.
rebuild_only_tests = tests if tests != self.tests else []
if not tests:
return 0
if not self.options.no_rebuild and not rebuild_deps(self.ninja, self.options.wd, rebuild_only_tests, self.options.benchmark):
# We return 125 here in case the build failed.
# The reason is that exit code 125 tells `git bisect run` that the current
# commit should be skipped. Thus users can directly use `meson test` to
# bisect without needing to handle the does-not-build case separately in a
# wrapper script.
sys.exit(125)
self.name_max_len = max(uniwidth(self.get_pretty_suite(test)) for test in tests)
self.options.num_processes = min(self.options.num_processes,
len(tests) * self.options.repeat)
startdir = os.getcwd()
try:
os.chdir(self.options.wd)
runners: T.List[SingleTestRunner] = []
for i in range(self.options.repeat):
runners.extend(self.get_test_runner(test, i) for test in tests)
if i == 0:
self.duration_max_len = max(len(str(int(runner.timeout or 99)))
for runner in runners)
# Disable the progress report if it gets in the way
self.need_console = any(runner.console_mode is not ConsoleUser.LOGGER
for runner in runners)
self.test_count = len(runners)
self.run_tests(runners)
finally:
os.chdir(startdir)
return 1 if self.total_failure_count() > 0 else 0
@staticmethod
def split_suite_string(suite: str) -> T.Tuple[str, str]:
if ':' in suite:
split = suite.split(':', 1)
assert len(split) == 2
return split[0], split[1]
else:
return suite, ""
@staticmethod
def test_in_suites(test: TestSerialisation, suites: T.List[str]) -> bool:
for suite in suites:
(prj_match, st_match) = TestHarness.split_suite_string(suite)
for prjst in test.suite:
(prj, st) = TestHarness.split_suite_string(prjst)
# The SUITE can be passed as
# - `name` - We select tests belonging to (sub)project OR suite
# with the given name.
# - `:suite_name` - We select tests belonging to any (sub)projects
# and in suite_name.
# - `project_name:suite_name` - We select tests belonging
# to project_name and in suite_name.
if not st_match:
if prj_match in {prj, st}:
return True
elif not prj_match:
if st == st_match:
return True
else:
if prj == prj_match and st == st_match:
return True
return False
def test_suitable(self, test: TestSerialisation) -> bool:
if TestHarness.test_in_suites(test, self.options.exclude_suites):
return False
if self.options.include_suites:
# Both force inclusion (overriding add_test_setup) and exclude
# everything else
return TestHarness.test_in_suites(test, self.options.include_suites)
if self.options.setup:
setup = self.get_test_setup(test)
if TestHarness.test_in_suites(test, setup.exclude_suites):
return False
return True
def tests_from_args(self, tests: T.List[TestSerialisation]) -> T.Generator[TestSerialisation, None, None]:
'''
Allow specifying test names like "meson test foo1 foo2", where test('foo1', ...)
Also support specifying the subproject to run tests from like
"meson test subproj:" (all tests inside subproj) or "meson test subproj:foo1"
to run foo1 inside subproj. Coincidentally also "meson test :foo1" to
run all tests with that name across all subprojects, which is
identical to "meson test foo1"
'''
patterns: T.Dict[T.Tuple[str, str], bool] = {}
for arg in self.options.args:
# Replace empty components by wildcards:
# '' -> '*:*'
# 'name' -> '*:name'
# ':name' -> '*:name'
# 'proj:' -> 'proj:*'
if ':' in arg:
subproj, name = arg.split(':', maxsplit=1)
if name == '':
name = '*'
if subproj == '': # in case arg was ':'
subproj = '*'
else:
subproj, name = '*', arg
patterns[(subproj, name)] = False
for t in tests:
# For each test, find the first matching pattern
# and mark it as used. yield the matching tests.
for subproj, name in list(patterns):
if fnmatch(t.project_name, subproj) and fnmatch(t.name, name):
patterns[(subproj, name)] = True
yield t
break
for (subproj, name), was_used in patterns.items():
if not was_used:
# For each unused pattern...
arg = f'{subproj}:{name}'
for t in tests:
# ... if it matches a test, then it wasn't used because another
# pattern matched the same test before.
# Report it as a warning.
if fnmatch(t.project_name, subproj) and fnmatch(t.name, name):
mlog.warning(f'{arg} test name is redundant and was not used')
break
else:
# If the pattern doesn't match any test,
# report it as an error. We don't want the `test` command to
# succeed on an invalid pattern.
raise MesonException(f'{arg} test name does not match any test')
def get_tests(self, errorfile: T.Optional[T.IO] = None) -> T.List[TestSerialisation]:
if not self.tests:
print('No tests defined.', file=errorfile)
return []
tests = [t for t in self.tests if self.test_suitable(t)]
if self.options.args:
tests = list(self.tests_from_args(tests))
if self.options.slice:
our_slice, nslices = self.options.slice
if nslices > len(tests):
raise MesonException(f'number of slices ({nslices}) exceeds number of tests ({len(tests)})')
tests = tests[our_slice - 1::nslices]
if not tests:
print('No suitable tests defined.', file=errorfile)
return []
return tests
def flush_logfiles(self) -> None:
for l in self.loggers:
l.flush()
def open_logfiles(self) -> None:
if not self.logfile_base:
return
self.loggers.append(JunitBuilder(self.logfile_base + '.junit.xml'))
self.loggers.append(JsonLogfileBuilder(self.logfile_base + '.json'))
self.loggers.append(TextLogfileBuilder(self.logfile_base + '.txt', errors='surrogateescape'))
@staticmethod
def get_wrapper(options: argparse.Namespace) -> T.List[str]:
if options.gdb:
wrap = [options.gdb_path, '--quiet']
if options.repeat > 1:
wrap += ['-ex', 'run', '-ex', 'quit']
# Signal the end of arguments to gdb
wrap += ['--args']
elif options.wrapper:
wrap = options.wrapper
else:
wrap = []
return wrap
def get_pretty_suite(self, test: TestSerialisation) -> str:
assert test.suite, 'Interpreter should ensure there is always at least one suite'
prj = TestHarness.split_suite_string(test.suite[0])[0]
suites: T.List[str] = []
for i in test.suite:
s = TestHarness.split_suite_string(i)[1]
if s:
suites.append(s)
name = f'{prj}:{test.name}'
if suites:
s = '+'.join(suites)
name = f'{s} - {name}'
return name
def run_tests(self, runners: T.List[SingleTestRunner]) -> None:
try:
self.open_logfiles()
# TODO: this is the default for python 3.8
if sys.platform == 'win32' and sys.version_info < (3, 8):
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(self._run_tests(runners))
finally:
self.close_logfiles()
def log_subtest(self, test: TestRun, s: str, res: TestResult) -> None:
for l in self.loggers:
l.log_subtest(self, test, s, res)
def log_start_test(self, test: TestRun) -> None:
for l in self.loggers:
l.start_test(self, test)
async def _run_tests(self, runners: T.List[SingleTestRunner]) -> None:
semaphore = asyncio.Semaphore(self.options.num_processes)
futures: T.Deque[asyncio.Future] = deque()
running_tests: T.Dict[asyncio.Future, str] = {}
interrupted = False
ctrlc_times: T.Deque[float] = deque(maxlen=MAX_CTRLC)
loop = asyncio.get_running_loop()
async def run_test(test: SingleTestRunner) -> None:
async with semaphore:
if interrupted or (self.options.repeat > 1 and self.fail_count):
return
res = await test.run(self)
self.process_test_result(res)
maxfail = self.options.maxfail
if maxfail and self.fail_count >= maxfail and res.res.is_bad():
cancel_all_tests()
def test_done(f: asyncio.Future) -> None:
if not f.cancelled():
f.result()
futures.remove(f)
try:
del running_tests[f]
except KeyError:
pass
def cancel_one_test(warn: bool) -> None:
future = futures.popleft()
futures.append(future)
if warn:
self.flush_logfiles()
mlog.warning('CTRL-C detected, interrupting {}'.format(running_tests[future]))
del running_tests[future]
future.cancel()
def cancel_all_tests() -> None:
nonlocal interrupted
interrupted = True
while running_tests:
cancel_one_test(False)
def sigterm_handler() -> None:
if interrupted:
return
self.flush_logfiles()
mlog.warning('Received SIGTERM, exiting')
cancel_all_tests()
def sigint_handler() -> None:
# We always pick the longest-running future that has not been cancelled
# If all the tests have been CTRL-C'ed, just stop
nonlocal interrupted
if interrupted:
return
ctrlc_times.append(loop.time())
if len(ctrlc_times) == MAX_CTRLC and ctrlc_times[-1] - ctrlc_times[0] < 1:
self.flush_logfiles()
mlog.warning('CTRL-C detected, exiting')
cancel_all_tests()
elif running_tests:
cancel_one_test(True)
else:
self.flush_logfiles()
mlog.warning('CTRL-C detected, exiting')
interrupted = True
for l in self.loggers:
l.start(self)
if sys.platform != 'win32':
if os.getpgid(0) == os.getpid():
loop.add_signal_handler(signal.SIGINT, sigint_handler)
else:
loop.add_signal_handler(signal.SIGINT, sigterm_handler)
loop.add_signal_handler(signal.SIGTERM, sigterm_handler)
try:
for runner in runners:
if not runner.is_parallel:
await complete_all(futures)
future = asyncio.ensure_future(run_test(runner))
futures.append(future)
running_tests[future] = runner.visible_name
future.add_done_callback(test_done)
if not runner.is_parallel:
await complete(future)
if self.options.repeat > 1 and self.fail_count:
break
await complete_all(futures)
finally:
if sys.platform != 'win32':
loop.remove_signal_handler(signal.SIGINT)
loop.remove_signal_handler(signal.SIGTERM)
for l in self.loggers:
await l.finish(self)
def list_tests(th: TestHarness) -> bool:
tests = th.get_tests(errorfile=sys.stderr)
for t in tests:
print(th.get_pretty_suite(t))
return not tests
def rebuild_deps(ninja: T.List[str], wd: str, tests: T.List[TestSerialisation], benchmark: bool) -> bool:
def convert_path_to_target(path: str) -> str:
path = os.path.relpath(path, wd)
if os.sep != '/':
path = path.replace(os.sep, '/')
return path
assert len(ninja) > 0
targets: T.Set[str] = set()
if tests:
targets_file = os.path.join(wd, 'meson-info/intro-targets.json')
with open(targets_file, encoding='utf-8') as fp:
targets_info = json.load(fp)
depends: T.Set[str] = set()
intro_targets: T.Dict[str, T.List[str]] = {}
for target in targets_info:
intro_targets[target['id']] = [
convert_path_to_target(f)
for f in target['filename']]
for t in tests:
for d in t.depends:
if d in depends:
continue
depends.update(d)
targets.update(intro_targets[d])
else:
if benchmark:
targets.add('meson-benchmark-prereq')
else:
targets.add('meson-test-prereq')
if not targets:
# We want to build minimal deps, but if the subset of targets have no
# deps then ninja falls back to 'all'.
return True
ret = subprocess.run(ninja + ['-C', wd] + sorted(targets)).returncode
if ret != 0:
print(f'Could not rebuild {wd}')
return False
return True
def run(options: argparse.Namespace) -> int:
if options.benchmark or options.interactive:
options.num_processes = 1
if options.verbose and options.quiet:
print('Can not be both quiet and verbose at the same time.')
return 1
check_bin = None
if options.gdb:
options.interactive = True
if options.wrapper:
print('Must not specify both a wrapper and gdb at the same time.')
return 1
check_bin = 'gdb'
if options.interactive:
options.verbose = True
if options.wrapper:
check_bin = options.wrapper[0]
if check_bin is not None:
exe = ExternalProgram(check_bin, silent=True)
if not exe.found():
print(f'Could not find requested program: {check_bin!r}')
return 1
b = build.load(options.wd)
need_vsenv = T.cast('bool', b.environment.coredata.optstore.get_value_for(OptionKey('vsenv')))
setup_vsenv(need_vsenv)
if not options.no_rebuild:
backend = b.environment.coredata.optstore.get_value_for(OptionKey('backend'))
if backend == 'none':
# nothing to build...
options.no_rebuild = True
elif backend != 'ninja':
print('Only ninja backend is supported to rebuild tests before running them.')
# Disable, no point in trying to build anything later
options.no_rebuild = True
with TestHarness(options) as th:
try:
if options.list:
return list_tests(th)
return th.doit()
except TestException as e:
print('Meson test encountered an error:\n')
if os.environ.get('MESON_FORCE_BACKTRACE'):
raise e
else:
print(e)
return 1
def run_with_args(args: T.List[str]) -> int:
parser = argparse.ArgumentParser(prog='meson test')
add_arguments(parser)
options = parser.parse_args(args)
return run(options)
|