1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
import contextlib
import json
import os
import os.path
import sys
from textwrap import dedent
import unittest
from test import support
from test.support import import_helper
from test.support import os_helper
# Raise SkipTest if subinterpreters not supported.
import_helper.import_module('_interpreters')
from .utils import TestBase
class StartupTests(TestBase):
# We want to ensure the initial state of subinterpreters
# matches expectations.
_subtest_count = 0
@contextlib.contextmanager
def subTest(self, *args):
with super().subTest(*args) as ctx:
self._subtest_count += 1
try:
yield ctx
finally:
if self._debugged_in_subtest:
if self._subtest_count == 1:
# The first subtest adds a leading newline, so we
# compensate here by not printing a trailing newline.
print('### end subtest debug ###', end='')
else:
print('### end subtest debug ###')
self._debugged_in_subtest = False
def debug(self, msg, *, header=None):
if header:
self._debug(f'--- {header} ---')
if msg:
if msg.endswith(os.linesep):
self._debug(msg[:-len(os.linesep)])
else:
self._debug(msg)
self._debug('<no newline>')
self._debug('------')
else:
self._debug(msg)
_debugged = False
_debugged_in_subtest = False
def _debug(self, msg):
if not self._debugged:
print()
self._debugged = True
if self._subtest is not None:
if True:
if not self._debugged_in_subtest:
self._debugged_in_subtest = True
print('### start subtest debug ###')
print(msg)
else:
print(msg)
def create_temp_dir(self):
import tempfile
tmp = tempfile.mkdtemp(prefix='test_interpreters_')
tmp = os.path.realpath(tmp)
self.addCleanup(os_helper.rmtree, tmp)
return tmp
def write_script(self, *path, text):
filename = os.path.join(*path)
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
with open(filename, 'w', encoding='utf-8') as outfile:
outfile.write(dedent(text))
return filename
@support.requires_subprocess()
def run_python(self, argv, *, cwd=None):
# This method is inspired by
# EmbeddingTestsMixin.run_embedded_interpreter() in test_embed.py.
import shlex
import subprocess
if isinstance(argv, str):
argv = shlex.split(argv)
argv = [sys.executable, *argv]
try:
proc = subprocess.run(
argv,
cwd=cwd,
capture_output=True,
text=True,
)
except Exception as exc:
self.debug(f'# cmd: {shlex.join(argv)}')
if isinstance(exc, FileNotFoundError) and not exc.filename:
if os.path.exists(argv[0]):
exists = 'exists'
else:
exists = 'does not exist'
self.debug(f'{argv[0]} {exists}')
raise # re-raise
assert proc.stderr == '' or proc.returncode != 0, proc.stderr
if proc.returncode != 0 and support.verbose:
self.debug(f'# python3 {shlex.join(argv[1:])} failed:')
self.debug(proc.stdout, header='stdout')
self.debug(proc.stderr, header='stderr')
self.assertEqual(proc.returncode, 0)
self.assertEqual(proc.stderr, '')
return proc.stdout
def test_sys_path_0(self):
# The main interpreter's sys.path[0] should be used by subinterpreters.
script = '''
import sys
from concurrent import interpreters
orig = sys.path[0]
interp = interpreters.create()
interp.exec(f"""if True:
import json
import sys
print(json.dumps({{
'main': {orig!r},
'sub': sys.path[0],
}}, indent=4), flush=True)
""")
'''
# <tmp>/
# pkg/
# __init__.py
# __main__.py
# script.py
# script.py
cwd = self.create_temp_dir()
self.write_script(cwd, 'pkg', '__init__.py', text='')
self.write_script(cwd, 'pkg', '__main__.py', text=script)
self.write_script(cwd, 'pkg', 'script.py', text=script)
self.write_script(cwd, 'script.py', text=script)
cases = [
('script.py', cwd),
('-m script', cwd),
('-m pkg', cwd),
('-m pkg.script', cwd),
('-c "import script"', ''),
]
for argv, expected in cases:
with self.subTest(f'python3 {argv}'):
out = self.run_python(argv, cwd=cwd)
data = json.loads(out)
sp0_main, sp0_sub = data['main'], data['sub']
self.assertEqual(sp0_sub, sp0_main)
self.assertEqual(sp0_sub, expected)
# XXX Also check them all with the -P cmdline flag?
class FinalizationTests(TestBase):
@support.requires_subprocess()
def test_gh_109793(self):
# Make sure finalization finishes and the correct error code
# is reported, even when subinterpreters get cleaned up at the end.
import subprocess
argv = [sys.executable, '-c', '''if True:
from concurrent import interpreters
interp = interpreters.create()
raise Exception
''']
proc = subprocess.run(argv, capture_output=True, text=True)
self.assertIn('Traceback', proc.stderr)
if proc.returncode == 0 and support.verbose:
print()
print("--- cmd unexpected succeeded ---")
print(f"stdout:\n{proc.stdout}")
print(f"stderr:\n{proc.stderr}")
print("------")
self.assertEqual(proc.returncode, 1)
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
unittest.main()
|