File: __init__.py

package info (click to toggle)
dumb-init 1.2.5-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 268 kB
  • sloc: python: 677; ansic: 260; makefile: 86; sh: 49
file content (111 lines) | stat: -rw-r--r-- 2,814 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import errno
import os
import re
import signal
import sys
import time
from contextlib import contextmanager
from subprocess import PIPE
from subprocess import Popen

from py._path.local import LocalPath


# these signals cause dumb-init to suspend itself
SUSPEND_SIGNALS = frozenset([
    signal.SIGTSTP,
    signal.SIGTTOU,
    signal.SIGTTIN,
])

NORMAL_SIGNALS = frozenset(
    set(range(1, 32)) -
    {signal.SIGKILL, signal.SIGSTOP, signal.SIGCHLD} -
    SUSPEND_SIGNALS,
)


@contextmanager
def print_signals(args=()):
    """Start print_signals and yield dumb-init process and print_signals PID."""
    proc = Popen(
        (
            ('dumb-init',) +
            tuple(args) +
            (sys.executable, '-m', 'testing.print_signals')
        ),
        stdout=PIPE,
    )
    line = proc.stdout.readline()
    m = re.match(b'^ready \\(pid: ([0-9]+)\\)\n$', line)
    assert m, line

    yield proc, m.group(1).decode('ascii')

    for pid in pid_tree(proc.pid):
        os.kill(pid, signal.SIGKILL)


def child_pids(pid):
    """Return a list of direct child PIDs for the given PID."""
    children = set()
    for p in LocalPath('/proc').listdir():
        try:
            stat = open(p.join('stat').strpath).read()
            m = re.match(r'^\d+ \(.+?\) [a-zA-Z] (\d+) ', stat)
            assert m, stat
            ppid = int(m.group(1))
            if ppid == pid:
                children.add(int(p.basename))
        except OSError:
            # Happens when the process exits after listing it, or between
            # opening stat and reading it.
            pass
    return children


def pid_tree(pid):
    """Return a list of all descendant PIDs for the given PID."""
    children = child_pids(pid)
    return {
        pid
        for child in children
        for pid in pid_tree(child)
    } | children


def is_alive(pid):
    """Return whether a process is running with the given PID."""
    return LocalPath('/proc').join(str(pid)).isdir()


def process_state(pid):
    """Return a process' state, such as "stopped" or "running"."""
    status = LocalPath('/proc').join(str(pid), 'status').read()
    m = re.search(r'^State:\s+[A-Z] \(([a-z]+)\)$', status, re.MULTILINE)
    return m.group(1)


def sleep_until(fn, timeout=1.5):
    """Sleep until fn succeeds, or we time out."""
    interval = 0.01
    so_far = 0
    while True:
        try:
            fn()
        except Exception:
            if so_far >= timeout:
                raise
        else:
            break
        time.sleep(interval)
        so_far += interval


def kill_if_alive(pid, signum=signal.SIGKILL):
    """Kill a process, ignoring "no such process" errors."""
    try:
        os.kill(pid, signum)
    except OSError as ex:
        if ex.errno != errno.ESRCH:  # No such process
            raise