1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
import atexit
import os
import textwrap
import unittest
from test import support
from test.support import script_helper
from test.support import threading_helper
class GeneralTest(unittest.TestCase):
def test_general(self):
# Run _test_atexit.py in a subprocess since it calls atexit._clear()
script = support.findfile("_test_atexit.py")
script_helper.run_test_script(script)
class FunctionalTest(unittest.TestCase):
def test_shutdown(self):
# Actually test the shutdown mechanism in a subprocess
code = textwrap.dedent("""
import atexit
def f(msg):
print(msg)
atexit.register(f, "one")
atexit.register(f, "two")
""")
res = script_helper.assert_python_ok("-c", code)
self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
self.assertFalse(res.err)
def test_atexit_instances(self):
# bpo-42639: It is safe to have more than one atexit instance.
code = textwrap.dedent("""
import sys
import atexit as atexit1
del sys.modules['atexit']
import atexit as atexit2
del sys.modules['atexit']
assert atexit2 is not atexit1
atexit1.register(print, "atexit1")
atexit2.register(print, "atexit2")
""")
res = script_helper.assert_python_ok("-c", code)
self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"])
self.assertFalse(res.err)
@threading_helper.requires_working_threading()
@support.requires_resource("cpu")
@unittest.skipUnless(support.Py_GIL_DISABLED, "only meaningful without the GIL")
def test_atexit_thread_safety(self):
# GH-126907: atexit was not thread safe on the free-threaded build
source = """
from threading import Thread
def dummy():
pass
def thready():
for _ in range(100):
atexit.register(dummy)
atexit._clear()
atexit.register(dummy)
atexit.unregister(dummy)
atexit._run_exitfuncs()
threads = [Thread(target=thready) for _ in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
"""
# atexit._clear() has some evil side effects, and we don't
# want them to affect the rest of the tests.
script_helper.assert_python_ok("-c", textwrap.dedent(source))
@support.cpython_only
class SubinterpreterTest(unittest.TestCase):
def test_callbacks_leak(self):
# This test shows a leak in refleak mode if atexit doesn't
# take care to free callbacks in its per-subinterpreter module
# state.
n = atexit._ncallbacks()
code = textwrap.dedent(r"""
import atexit
def f():
pass
atexit.register(f)
del atexit
""")
ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertEqual(atexit._ncallbacks(), n)
def test_callbacks_leak_refcycle(self):
# Similar to the above, but with a refcycle through the atexit
# module.
n = atexit._ncallbacks()
code = textwrap.dedent(r"""
import atexit
def f():
pass
atexit.register(f)
atexit.__atexit = atexit
""")
ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertEqual(atexit._ncallbacks(), n)
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_callback_on_subinterpreter_teardown(self):
# This tests if a callback is called on
# subinterpreter teardown.
expected = b"The test has passed!"
r, w = os.pipe()
code = textwrap.dedent(r"""
import os
import atexit
def callback():
os.write({:d}, b"The test has passed!")
atexit.register(callback)
""".format(w))
ret = support.run_in_subinterp(code)
os.close(w)
self.assertEqual(os.read(r, len(expected)), expected)
os.close(r)
if __name__ == "__main__":
unittest.main()
|