File: _test_atexit.py

package info (click to toggle)
python3.13 3.13.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 123,756 kB
  • sloc: python: 710,464; ansic: 655,626; xml: 31,250; sh: 5,844; cpp: 4,327; makefile: 1,983; objc: 787; lisp: 502; javascript: 213; asm: 75; csh: 12
file content (187 lines) | stat: -rw-r--r-- 5,448 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
Tests run by test_atexit in a subprocess since it clears atexit callbacks.
"""
import atexit
import sys
import unittest
from test import support


class GeneralTest(unittest.TestCase):
    def setUp(self):
        atexit._clear()

    def tearDown(self):
        atexit._clear()

    def assert_raises_unraisable(self, exc_type, func, *args):
        with support.catch_unraisable_exception() as cm:
            atexit.register(func, *args)
            atexit._run_exitfuncs()

            self.assertIsNone(cm.unraisable.object)
            self.assertEqual(cm.unraisable.err_msg,
                    f'Exception ignored in atexit callback {func!r}')
            self.assertEqual(cm.unraisable.exc_type, exc_type)
            self.assertEqual(type(cm.unraisable.exc_value), exc_type)

    def test_order(self):
        # Check that callbacks are called in reverse order with the expected
        # positional and keyword arguments.
        calls = []

        def func1(*args, **kwargs):
            calls.append(('func1', args, kwargs))

        def func2(*args, **kwargs):
            calls.append(('func2', args, kwargs))

        # be sure args are handled properly
        atexit.register(func1, 1, 2)
        atexit.register(func2)
        atexit.register(func2, 3, key="value")
        atexit._run_exitfuncs()

        self.assertEqual(calls,
                         [('func2', (3,), {'key': 'value'}),
                          ('func2', (), {}),
                          ('func1', (1, 2), {})])

    def test_badargs(self):
        def func():
            pass

        # func() has no parameter, but it's called with 2 parameters
        self.assert_raises_unraisable(TypeError, func, 1 ,2)

    def test_raise(self):
        def raise_type_error():
            raise TypeError

        self.assert_raises_unraisable(TypeError, raise_type_error)

    def test_raise_unnormalized(self):
        # bpo-10756: Make sure that an unnormalized exception is handled
        # properly.
        def div_zero():
            1 / 0

        self.assert_raises_unraisable(ZeroDivisionError, div_zero)

    def test_exit(self):
        self.assert_raises_unraisable(SystemExit, sys.exit)

    def test_stress(self):
        a = [0]
        def inc():
            a[0] += 1

        for i in range(128):
            atexit.register(inc)
        atexit._run_exitfuncs()

        self.assertEqual(a[0], 128)

    def test_clear(self):
        a = [0]
        def inc():
            a[0] += 1

        atexit.register(inc)
        atexit._clear()
        atexit._run_exitfuncs()

        self.assertEqual(a[0], 0)

    def test_unregister(self):
        a = [0]
        def inc():
            a[0] += 1
        def dec():
            a[0] -= 1

        for i in range(4):
            atexit.register(inc)
        atexit.register(dec)
        atexit.unregister(inc)
        atexit._run_exitfuncs()

        self.assertEqual(a[0], -1)

    def test_bound_methods(self):
        l = []
        atexit.register(l.append, 5)
        atexit._run_exitfuncs()
        self.assertEqual(l, [5])

        atexit.unregister(l.append)
        atexit._run_exitfuncs()
        self.assertEqual(l, [5])

    def test_atexit_with_unregistered_function(self):
        # See bpo-46025 for more info
        def func():
            atexit.unregister(func)
            1/0
        atexit.register(func)
        try:
            with support.catch_unraisable_exception() as cm:
                atexit._run_exitfuncs()
                self.assertIsNone(cm.unraisable.object)
                self.assertEqual(cm.unraisable.err_msg,
                        f'Exception ignored in atexit callback {func!r}')
                self.assertEqual(cm.unraisable.exc_type, ZeroDivisionError)
                self.assertEqual(type(cm.unraisable.exc_value), ZeroDivisionError)
        finally:
            atexit.unregister(func)

    def test_eq_unregister_clear(self):
        # Issue #112127: callback's __eq__ may call unregister or _clear
        class Evil:
            def __eq__(self, other):
                action(other)
                return NotImplemented

        for action in atexit.unregister, lambda o: atexit._clear():
            with self.subTest(action=action):
                atexit.register(lambda: None)
                atexit.unregister(Evil())
                atexit._clear()

    def test_eq_unregister(self):
        # Issue #112127: callback's __eq__ may call unregister
        def f1():
            log.append(1)
        def f2():
            log.append(2)
        def f3():
            log.append(3)

        class Pred:
            def __eq__(self, other):
                nonlocal cnt
                cnt += 1
                if cnt == when:
                    atexit.unregister(what)
                if other is f2:
                    return True
                return False

        for what, expected in (
                (f1, [3]),
                (f2, [3, 1]),
                (f3, [1]),
            ):
            for when in range(1, 4):
                with self.subTest(what=what.__name__, when=when):
                    cnt = 0
                    log = []
                    for f in (f1, f2, f3):
                        atexit.register(f)
                    atexit.unregister(Pred())
                    atexit._run_exitfuncs()
                    self.assertEqual(log, expected)


if __name__ == "__main__":
    unittest.main()