File: rsignal.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (167 lines) | stat: -rw-r--r-- 7,160 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import signal as cpy_signal
import sys
import py
from rpython.translator import cdir
from rpython.rtyper.tool import rffi_platform
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.translator.tool.cbuild import ExternalCompilationInfo
from rpython.rlib.rarithmetic import is_valid_int

def setup():
    for key, value in cpy_signal.__dict__.items():
        if (key.startswith('SIG') or key.startswith('CTRL_')) and \
                is_valid_int(value) and \
                key != 'SIG_DFL' and key != 'SIG_IGN':
            globals()[key] = value
            yield key

NSIG    = cpy_signal.NSIG
SIG_DFL = cpy_signal.SIG_DFL
SIG_IGN = cpy_signal.SIG_IGN
signal_names = list(setup())
signal_values = {}
for key in signal_names:
    signal_values[globals()[key]] = None
if sys.platform == 'win32' and not hasattr(cpy_signal,'CTRL_C_EVENT'):
    # XXX Hack to revive values that went missing,
    #     Remove this once we are sure the host cpy module has them.
    signal_values[0] = None
    signal_values[1] = None
    signal_names.append('CTRL_C_EVENT')
    signal_names.append('CTRL_BREAK_EVENT')
    CTRL_C_EVENT = 0
    CTRL_BREAK_EVENT = 1
includes = ['stdlib.h', 'src/signals.h', 'signal.h']
if sys.platform != 'win32':
    includes.append('sys/time.h')

cdir = py.path.local(cdir)

eci = ExternalCompilationInfo(
    includes = includes,
    separate_module_files = [cdir / 'src' / 'signals.c'],
    include_dirs = [str(cdir)],
    pre_include_bits = ["#define PYPY_SIGINT_INTERRUPT_EVENT 1\n"],
)

class CConfig:
    _compilation_info_ = eci

if sys.platform != 'win32':
    for name in """
        ITIMER_REAL ITIMER_VIRTUAL ITIMER_PROF
        SIG_BLOCK SIG_UNBLOCK SIG_SETMASK""".split():
        setattr(CConfig, name, rffi_platform.DefinedConstantInteger(name))

    CConfig.timeval = rffi_platform.Struct(
        'struct timeval',
        [('tv_sec', rffi.LONG),
         ('tv_usec', rffi.LONG)])

    CConfig.itimerval = rffi_platform.Struct(
        'struct itimerval',
        [('it_value', CConfig.timeval),
         ('it_interval', CConfig.timeval)])

for k, v in rffi_platform.configure(CConfig).items():
    globals()[k] = v

def external(name, args, result, **kwds):
    return rffi.llexternal(name, args, result, compilation_info=eci,
                           sandboxsafe=True, **kwds)

pypysig_ignore = external('pypysig_ignore', [rffi.INT], lltype.Void)
pypysig_default = external('pypysig_default', [rffi.INT], lltype.Void)
pypysig_setflag = external('pypysig_setflag', [rffi.INT], lltype.Void)
pypysig_reinstall = external('pypysig_reinstall', [rffi.INT], lltype.Void)
PYPYSIG_WITH_NUL_BYTE = 0x01   # flags for the 2nd argument to set_wakeup_fd()
PYPYSIG_USE_SEND = 0x02
PYPYSIG_NO_WARN_FULL  = 0x04
pypysig_set_wakeup_fd = external('pypysig_set_wakeup_fd',
                                 [rffi.INT, rffi.INT], rffi.INT)
pypysig_poll = external('pypysig_poll', [], rffi.INT, releasegil=False)
# don't bother releasing the GIL around a call to pypysig_poll: it's
# pointless and a performance issue
pypysig_pushback = external('pypysig_pushback', [rffi.INT], lltype.Void,
                            releasegil=False)

# workaround: on CPython2, calling pypysig_getaddr_occurred_fullstruct is
# extremely slow, due to some weird (ll2)ctypes reason. this made untranslated tests
# of the signal module time out really badly. therefore we expose the
# pypysig_getaddr_occurred C function *twice*, with different types. One
# returns just the one field that we need most of the time (including after
# every bytecode), and one also returns the other fields that we only need when
# we poll for signals.
struct_name = 'pypysig_long_struct_inner'
STRUCT_ONEFIELD = lltype.Struct('pypysig_long_struct_inner', ('c_value', lltype.Signed),
                                hints={'c_name': struct_name, 'external' : 'C'})

pypysig_getaddr_occurred = external('pypysig_getaddr_occurred', [],
                                    lltype.Ptr(STRUCT_ONEFIELD), _nowrapper=True,
                                    elidable_function=True)

struct_name = 'pypysig_long_struct'
LONG_STRUCT = lltype.Struct(struct_name, ('c_value', lltype.Signed),
                                         ('c_cookie', rffi.CFixedArray(lltype.Char, 8)),
                                         ('c_debugger_pending_call', lltype.Signed),
                                         ('c_debugger_script_path', lltype.Array(lltype.Char, hints={'nolength': True})),
                            hints={'c_name' : struct_name, 'external' : 'C'})
del struct_name

pypysig_getaddr_occurred_fullstruct = external('pypysig_getaddr_occurred', [],
                                              lltype.Ptr(LONG_STRUCT), _nowrapper=True,
                                              elidable_function=True)
pypysig_check_and_reset = external('pypysig_check_and_reset', [],
                                   lltype.Bool, _nowrapper=True)
c_alarm = external('alarm', [rffi.INT], rffi.INT)
c_pause = external('pause', [], rffi.INT, releasegil=True)
c_siginterrupt = external('siginterrupt', [rffi.INT, rffi.INT], rffi.INT,
                          save_err=rffi.RFFI_SAVE_ERRNO)
c_raise = external('raise', [rffi.INT], rffi.INT)

if sys.platform != 'win32':
    itimervalP = rffi.CArrayPtr(itimerval)
    c_setitimer = external('setitimer',
                           [rffi.INT, itimervalP, itimervalP], rffi.INT,
                           save_err=rffi.RFFI_SAVE_ERRNO)
    c_getitimer = external('getitimer', [rffi.INT, itimervalP], rffi.INT)

if sys.platform != 'win32':
    c_strsignal = external('strsignal', [rffi.INT], rffi.CCHARP)
    def strsignal(signum):
        res = c_strsignal(signum)
        if not res:
            return None
        return rffi.charp2str(res)
else:
    def strsignal(signum):
        # CPython does this too!
        if signum == SIGINT:
            return "Interrupt";
        elif signum == SIGILL:
            return "Illegal instruction";
        elif signum == SIGABRT:
            return "Aborted";
        elif signum == SIGFPE:
            return "Floating point exception";
        elif signum == SIGSEGV:
            return "Segmentation fault";
        elif signum == SIGTERM:
            return "Terminated";
        return None


if sys.platform != 'win32':
    c_sigset_t = rffi.COpaquePtr('sigset_t', compilation_info=eci)
    c_sigemptyset = external('sigemptyset', [c_sigset_t], rffi.INT)
    c_sigfillset = external('sigfillset', [c_sigset_t], rffi.INT)
    c_sigaddset = external('sigaddset', [c_sigset_t, rffi.INT], rffi.INT)
    c_sigismember = external('sigismember', [c_sigset_t, rffi.INT], rffi.INT)
    c_sigwait = external('sigwait', [c_sigset_t, rffi.INT_realP], rffi.INT_real,
                         releasegil=True,
                         save_err=rffi.RFFI_SAVE_ERRNO)
    c_sigpending = external('sigpending', [c_sigset_t], rffi.INT,
                            save_err=rffi.RFFI_SAVE_ERRNO)
    c_pthread_sigmask = external('pthread_sigmask',
                                 [rffi.INT, c_sigset_t, c_sigset_t], rffi.INT,
                                 save_err=rffi.RFFI_SAVE_ERRNO)