1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
import signal as cpy_signal
import sys
import py
from rpython.translator import cdir
from rpython.rtyper.tool import rffi_platform
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.translator.tool.cbuild import ExternalCompilationInfo
from rpython.rlib.rarithmetic import is_valid_int
def setup():
for key, value in cpy_signal.__dict__.items():
if (key.startswith('SIG') or key.startswith('CTRL_')) and \
is_valid_int(value) and \
key != 'SIG_DFL' and key != 'SIG_IGN':
globals()[key] = value
yield key
NSIG = cpy_signal.NSIG
SIG_DFL = cpy_signal.SIG_DFL
SIG_IGN = cpy_signal.SIG_IGN
signal_names = list(setup())
signal_values = {}
for key in signal_names:
signal_values[globals()[key]] = None
if sys.platform == 'win32' and not hasattr(cpy_signal,'CTRL_C_EVENT'):
# XXX Hack to revive values that went missing,
# Remove this once we are sure the host cpy module has them.
signal_values[0] = None
signal_values[1] = None
signal_names.append('CTRL_C_EVENT')
signal_names.append('CTRL_BREAK_EVENT')
CTRL_C_EVENT = 0
CTRL_BREAK_EVENT = 1
includes = ['stdlib.h', 'src/signals.h', 'signal.h']
if sys.platform != 'win32':
includes.append('sys/time.h')
cdir = py.path.local(cdir)
eci = ExternalCompilationInfo(
includes = includes,
separate_module_files = [cdir / 'src' / 'signals.c'],
include_dirs = [str(cdir)],
pre_include_bits = ["#define PYPY_SIGINT_INTERRUPT_EVENT 1\n"],
)
class CConfig:
_compilation_info_ = eci
if sys.platform != 'win32':
for name in """
ITIMER_REAL ITIMER_VIRTUAL ITIMER_PROF
SIG_BLOCK SIG_UNBLOCK SIG_SETMASK""".split():
setattr(CConfig, name, rffi_platform.DefinedConstantInteger(name))
CConfig.timeval = rffi_platform.Struct(
'struct timeval',
[('tv_sec', rffi.LONG),
('tv_usec', rffi.LONG)])
CConfig.itimerval = rffi_platform.Struct(
'struct itimerval',
[('it_value', CConfig.timeval),
('it_interval', CConfig.timeval)])
for k, v in rffi_platform.configure(CConfig).items():
globals()[k] = v
def external(name, args, result, **kwds):
return rffi.llexternal(name, args, result, compilation_info=eci,
sandboxsafe=True, **kwds)
pypysig_ignore = external('pypysig_ignore', [rffi.INT], lltype.Void)
pypysig_default = external('pypysig_default', [rffi.INT], lltype.Void)
pypysig_setflag = external('pypysig_setflag', [rffi.INT], lltype.Void)
pypysig_reinstall = external('pypysig_reinstall', [rffi.INT], lltype.Void)
PYPYSIG_WITH_NUL_BYTE = 0x01 # flags for the 2nd argument to set_wakeup_fd()
PYPYSIG_USE_SEND = 0x02
PYPYSIG_NO_WARN_FULL = 0x04
pypysig_set_wakeup_fd = external('pypysig_set_wakeup_fd',
[rffi.INT, rffi.INT], rffi.INT)
pypysig_poll = external('pypysig_poll', [], rffi.INT, releasegil=False)
# don't bother releasing the GIL around a call to pypysig_poll: it's
# pointless and a performance issue
pypysig_pushback = external('pypysig_pushback', [rffi.INT], lltype.Void,
releasegil=False)
# workaround: on CPython2, calling pypysig_getaddr_occurred_fullstruct is
# extremely slow, due to some weird (ll2)ctypes reason. this made untranslated tests
# of the signal module time out really badly. therefore we expose the
# pypysig_getaddr_occurred C function *twice*, with different types. One
# returns just the one field that we need most of the time (including after
# every bytecode), and one also returns the other fields that we only need when
# we poll for signals.
struct_name = 'pypysig_long_struct_inner'
STRUCT_ONEFIELD = lltype.Struct('pypysig_long_struct_inner', ('c_value', lltype.Signed),
hints={'c_name': struct_name, 'external' : 'C'})
pypysig_getaddr_occurred = external('pypysig_getaddr_occurred', [],
lltype.Ptr(STRUCT_ONEFIELD), _nowrapper=True,
elidable_function=True)
struct_name = 'pypysig_long_struct'
LONG_STRUCT = lltype.Struct(struct_name, ('c_value', lltype.Signed),
('c_cookie', rffi.CFixedArray(lltype.Char, 8)),
('c_debugger_pending_call', lltype.Signed),
('c_debugger_script_path', lltype.Array(lltype.Char, hints={'nolength': True})),
hints={'c_name' : struct_name, 'external' : 'C'})
del struct_name
pypysig_getaddr_occurred_fullstruct = external('pypysig_getaddr_occurred', [],
lltype.Ptr(LONG_STRUCT), _nowrapper=True,
elidable_function=True)
pypysig_check_and_reset = external('pypysig_check_and_reset', [],
lltype.Bool, _nowrapper=True)
c_alarm = external('alarm', [rffi.INT], rffi.INT)
c_pause = external('pause', [], rffi.INT, releasegil=True)
c_siginterrupt = external('siginterrupt', [rffi.INT, rffi.INT], rffi.INT,
save_err=rffi.RFFI_SAVE_ERRNO)
c_raise = external('raise', [rffi.INT], rffi.INT)
if sys.platform != 'win32':
itimervalP = rffi.CArrayPtr(itimerval)
c_setitimer = external('setitimer',
[rffi.INT, itimervalP, itimervalP], rffi.INT,
save_err=rffi.RFFI_SAVE_ERRNO)
c_getitimer = external('getitimer', [rffi.INT, itimervalP], rffi.INT)
if sys.platform != 'win32':
c_strsignal = external('strsignal', [rffi.INT], rffi.CCHARP)
def strsignal(signum):
res = c_strsignal(signum)
if not res:
return None
return rffi.charp2str(res)
else:
def strsignal(signum):
# CPython does this too!
if signum == SIGINT:
return "Interrupt";
elif signum == SIGILL:
return "Illegal instruction";
elif signum == SIGABRT:
return "Aborted";
elif signum == SIGFPE:
return "Floating point exception";
elif signum == SIGSEGV:
return "Segmentation fault";
elif signum == SIGTERM:
return "Terminated";
return None
if sys.platform != 'win32':
c_sigset_t = rffi.COpaquePtr('sigset_t', compilation_info=eci)
c_sigemptyset = external('sigemptyset', [c_sigset_t], rffi.INT)
c_sigfillset = external('sigfillset', [c_sigset_t], rffi.INT)
c_sigaddset = external('sigaddset', [c_sigset_t, rffi.INT], rffi.INT)
c_sigismember = external('sigismember', [c_sigset_t, rffi.INT], rffi.INT)
c_sigwait = external('sigwait', [c_sigset_t, rffi.INT_realP], rffi.INT_real,
releasegil=True,
save_err=rffi.RFFI_SAVE_ERRNO)
c_sigpending = external('sigpending', [c_sigset_t], rffi.INT,
save_err=rffi.RFFI_SAVE_ERRNO)
c_pthread_sigmask = external('pthread_sigmask',
[rffi.INT, c_sigset_t, c_sigset_t], rffi.INT,
save_err=rffi.RFFI_SAVE_ERRNO)
|