File: atexit.py

package info (click to toggle)
kitty 0.42.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 28,564 kB
  • sloc: ansic: 82,787; python: 55,191; objc: 5,122; sh: 1,295; xml: 364; makefile: 143; javascript: 78
file content (90 lines) | stat: -rw-r--r-- 2,867 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
# License: GPLv3 Copyright: 2025, Kovid Goyal <kovid at kovidgoyal.net>


import os
import select
import shutil
import signal
import subprocess
import tempfile

from kitty.constants import kitten_exe, kitty_exe
from kitty.shm import SharedMemory

from . import BaseTest


class Atexit(BaseTest):

    def setUp(self):
        self.tdir = tempfile.mkdtemp()

    def tearDown(self):
        shutil.rmtree(self.tdir)

    def test_atexit(self):

        def r(action='close'):
            p = subprocess.Popen([kitty_exe(), '+runpy', f'''\
import subprocess
p = subprocess.Popen(['{kitten_exe()}', '__atexit__'])
print(p.pid, flush=True)
raise SystemExit(p.wait())
'''], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
            readers = [p.stdout.fileno()]
            def read():
                r, _, _ = select.select(readers, [], [], 10)
                if not r:
                    raise TimeoutError('Timed out waiting for read from child')
                return p.stdout.readline().rstrip().decode()
            atexit_pid = int(read())
            for i in range(2):
                with open(os.path.join(self.tdir, str(i)), 'w') as f:
                    p.stdin.write(f'unlink {f.name}\n'.encode())
                    p.stdin.flush()
                select.select(readers, [], [], 10)
                self.ae(read(), str(i+1))
            sdir = os.path.join(self.tdir, 'd')
            os.mkdir(sdir)
            p.stdin.write(f'rmtree {sdir}\n'.encode())
            p.stdin.flush()
            open(os.path.join(sdir, 'f'), 'w').close()
            select.select(readers, [], [], 10)
            self.ae(read(), str(i+2))
            shm = SharedMemory(size=64)
            shm.write(b'1' * 64)
            shm.flush()
            p.stdin.write(f'shm_unlink {shm.name}\n'.encode())
            p.stdin.flush()
            self.ae(read(), str(i+3))

            self.assertTrue(os.listdir(self.tdir))
            shm2 = SharedMemory(shm.name)
            self.ae(shm2.read()[:64], b'1' * 64)

            # Ensure child is ignoring signals
            os.kill(atexit_pid, signal.SIGINT)
            os.kill(atexit_pid, signal.SIGTERM)
            if action == 'close':
                p.stdin.close()
            elif action == 'terminate':
                p.terminate()
            else:
                p.kill()
            p.wait(10)
            if action != 'close':
                p.stdin.close()
            select.select(readers, [], [], 10)
            self.assertFalse(read())
            p.stdout.close()
            self.assertFalse(os.listdir(self.tdir))
            try:
                os.waitpid(atexit_pid, 0)
            except ChildProcessError:
                pass
            self.assertRaises(FileNotFoundError, lambda: SharedMemory(shm.name))

        r('close')
        r('terminate')
        r('kill')