1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
import errno
import os
import re
import signal
import sys
import time
from contextlib import contextmanager
from subprocess import PIPE
from subprocess import Popen
from py._path.local import LocalPath
# these signals cause dumb-init to suspend itself
SUSPEND_SIGNALS = frozenset([
signal.SIGTSTP,
signal.SIGTTOU,
signal.SIGTTIN,
])
NORMAL_SIGNALS = frozenset(
set(range(1, 32)) -
{signal.SIGKILL, signal.SIGSTOP, signal.SIGCHLD} -
SUSPEND_SIGNALS,
)
@contextmanager
def print_signals(args=()):
"""Start print_signals and yield dumb-init process and print_signals PID."""
proc = Popen(
(
('dumb-init',) +
tuple(args) +
(sys.executable, '-m', 'testing.print_signals')
),
stdout=PIPE,
)
line = proc.stdout.readline()
m = re.match(b'^ready \\(pid: ([0-9]+)\\)\n$', line)
assert m, line
yield proc, m.group(1).decode('ascii')
for pid in pid_tree(proc.pid):
os.kill(pid, signal.SIGKILL)
def child_pids(pid):
"""Return a list of direct child PIDs for the given PID."""
children = set()
for p in LocalPath('/proc').listdir():
try:
stat = open(p.join('stat').strpath).read()
m = re.match(r'^\d+ \(.+?\) [a-zA-Z] (\d+) ', stat)
assert m, stat
ppid = int(m.group(1))
if ppid == pid:
children.add(int(p.basename))
except OSError:
# Happens when the process exits after listing it, or between
# opening stat and reading it.
pass
return children
def pid_tree(pid):
"""Return a list of all descendant PIDs for the given PID."""
children = child_pids(pid)
return {
pid
for child in children
for pid in pid_tree(child)
} | children
def is_alive(pid):
"""Return whether a process is running with the given PID."""
return LocalPath('/proc').join(str(pid)).isdir()
def process_state(pid):
"""Return a process' state, such as "stopped" or "running"."""
status = LocalPath('/proc').join(str(pid), 'status').read()
m = re.search(r'^State:\s+[A-Z] \(([a-z]+)\)$', status, re.MULTILINE)
return m.group(1)
def sleep_until(fn, timeout=1.5):
"""Sleep until fn succeeds, or we time out."""
interval = 0.01
so_far = 0
while True:
try:
fn()
except Exception:
if so_far >= timeout:
raise
else:
break
time.sleep(interval)
so_far += interval
def kill_if_alive(pid, signum=signal.SIGKILL):
"""Kill a process, ignoring "no such process" errors."""
try:
os.kill(pid, signum)
except OSError as ex:
if ex.errno != errno.ESRCH: # No such process
raise
|