1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
# for Windows only
import sys
from rpython.rlib import jit
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.translator.tool.cbuild import ExternalCompilationInfo
MESSAGEBOX = sys.platform == "win32"
MODULE = r"""
#include <Windows.h>
#pragma comment(lib, "user32.lib")
static void *volatile _cffi_bootstrap_text;
RPY_EXTERN int _cffi_errorbox1(void)
{
return InterlockedCompareExchangePointer(&_cffi_bootstrap_text,
(void *)1, NULL) == NULL;
}
static DWORD WINAPI _cffi_bootstrap_dialog(LPVOID ignored)
{
Sleep(666); /* may be interrupted if the whole process is closing */
MessageBoxW(NULL, (wchar_t *)_cffi_bootstrap_text,
L"PyPy: Python-CFFI error",
MB_OK | MB_ICONERROR);
_cffi_bootstrap_text = NULL;
return 0;
}
RPY_EXTERN void _cffi_errorbox(wchar_t *text)
{
/* Show a dialog box, but in a background thread, and
never show multiple dialog boxes at once. */
HANDLE h;
_cffi_bootstrap_text = text;
h = CreateThread(NULL, 0, _cffi_bootstrap_dialog,
NULL, 0, NULL);
if (h != NULL)
CloseHandle(h);
}
"""
if MESSAGEBOX:
eci = ExternalCompilationInfo(
separate_module_sources=[MODULE],
post_include_bits=["#include <wchar.h>\n",
"RPY_EXTERN int _cffi_errorbox1(void);\n",
"RPY_EXTERN void _cffi_errorbox(wchar_t *);\n"])
cffi_errorbox1 = rffi.llexternal("_cffi_errorbox1", [],
rffi.INT, compilation_info=eci)
cffi_errorbox = rffi.llexternal("_cffi_errorbox", [rffi.CWCHARP],
lltype.Void, compilation_info=eci)
class Message:
def __init__(self, space):
self.space = space
self.text_p = lltype.nullptr(rffi.CWCHARP.TO)
def start_error_capture(self):
ok = cffi_errorbox1()
if rffi.cast(lltype.Signed, ok) != 1:
return None
return self.space.appexec([], """():
import sys
class FileLike:
def write(self, x):
try:
of.write(x)
except:
pass
self.buf += x
fl = FileLike()
fl.buf = ''
of = sys.stderr
sys.stderr = fl
def done():
sys.stderr = of
return fl.buf
return done
""")
def stop_error_capture(self, w_done):
if w_done is None:
return
w_text = self.space.call_function(w_done)
p = rffi.utf82wcharp(self.space.utf8_w(w_text),
self.space.len_w(w_text),
track_allocation=False)
if self.text_p:
rffi.free_wcharp(self.text_p, track_allocation=False)
self.text_p = p # keepalive
cffi_errorbox(p)
@jit.dont_look_inside
def start_error_capture(space):
msg = space.fromcache(Message)
return msg.start_error_capture()
@jit.dont_look_inside
def stop_error_capture(space, x):
msg = space.fromcache(Message)
msg.stop_error_capture(x)
else:
def start_error_capture(space):
return None
def stop_error_capture(space, nothing):
pass
|