1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
#!/usr/bin/env python
# License: GPLv3 Copyright: 2025, Kovid Goyal <kovid at kovidgoyal.net>
import os
import select
import shutil
import signal
import subprocess
import tempfile
from kitty.constants import kitten_exe, kitty_exe
from kitty.shm import SharedMemory
from . import BaseTest
class Atexit(BaseTest):
def setUp(self):
self.tdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tdir)
def test_atexit(self):
def r(action='close'):
p = subprocess.Popen([kitty_exe(), '+runpy', f'''\
import subprocess
p = subprocess.Popen(['{kitten_exe()}', '__atexit__'])
print(p.pid, flush=True)
raise SystemExit(p.wait())
'''], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
readers = [p.stdout.fileno()]
def read():
r, _, _ = select.select(readers, [], [], 10)
if not r:
raise TimeoutError('Timed out waiting for read from child')
return p.stdout.readline().rstrip().decode()
atexit_pid = int(read())
for i in range(2):
with open(os.path.join(self.tdir, str(i)), 'w') as f:
p.stdin.write(f'unlink {f.name}\n'.encode())
p.stdin.flush()
select.select(readers, [], [], 10)
self.ae(read(), str(i+1))
sdir = os.path.join(self.tdir, 'd')
os.mkdir(sdir)
p.stdin.write(f'rmtree {sdir}\n'.encode())
p.stdin.flush()
open(os.path.join(sdir, 'f'), 'w').close()
select.select(readers, [], [], 10)
self.ae(read(), str(i+2))
shm = SharedMemory(size=64)
shm.write(b'1' * 64)
shm.flush()
p.stdin.write(f'shm_unlink {shm.name}\n'.encode())
p.stdin.flush()
self.ae(read(), str(i+3))
self.assertTrue(os.listdir(self.tdir))
shm2 = SharedMemory(shm.name)
self.ae(shm2.read()[:64], b'1' * 64)
# Ensure child is ignoring signals
os.kill(atexit_pid, signal.SIGINT)
os.kill(atexit_pid, signal.SIGTERM)
if action == 'close':
p.stdin.close()
elif action == 'terminate':
p.terminate()
else:
p.kill()
p.wait(10)
if action != 'close':
p.stdin.close()
select.select(readers, [], [], 10)
self.assertFalse(read())
p.stdout.close()
self.assertFalse(os.listdir(self.tdir))
try:
os.waitpid(atexit_pid, 0)
except ChildProcessError:
pass
self.assertRaises(FileNotFoundError, lambda: SharedMemory(shm.name))
r('close')
r('terminate')
r('kill')
|