File: state.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (103 lines) | stat: -rw-r--r-- 3,863 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from rpython.rtyper.lltypesystem import lltype, rffi
from rpython.rlib import jit
from rpython.rlib.objectmodel import specialize, not_rpython

from pypy.interpreter.error import oefmt

from pypy.module._hpy_universal import llapi
from pypy.module._hpy_universal import handlemanager
from pypy.module._hpy_universal.bridge import BRIDGE, hpy_get_bridge


class State(object):
    @not_rpython
    def __init__(self, space):
        self.space = space
        uctx = lltype.malloc(llapi.HPyContext.TO, flavor='raw', immortal=True)
        dctx = lltype.malloc(llapi.HPyContext.TO, flavor='raw', immortal=True)

        self.u_handles = handlemanager.HandleManager(space, uctx)
        self.d_handles = handlemanager.DebugHandleManager(space, dctx, self.u_handles)
        self.t_handles = handlemanager.TraceHandleManager(space, self.u_handles)

    @jit.dont_look_inside
    def setup(self, space):
        self.u_handles.setup_universal_ctx()
        self.d_handles.setup_debug_ctx()
        self.t_handles.setup_trace_ctx()
        self.global_handles = {}
        self.setup_bridge()

    @staticmethod
    def get(space):
        return space.fromcache(State)

    @specialize.arg(1)
    def get_handle_manager(self, mode):
        if mode == llapi.MODE_DEBUG:
            return self.d_handles
        elif mode == llapi.MODE_UNIVERSAL:
            return self.u_handles
        elif mode == llapi.MODE_TRACE:
            return self.t_handles
        else:
            raise oefmt(self.space.w_RuntimeError, "MODE %d not valid", mode)

    def setup_bridge(self):
        if self.space.config.translating:
            # after translation: call get_llhelper() to ensure that the
            # annotator sees the functions and generates the C source.
            #
            # The ptr[0] = ... is a work around to convince the translator NOT
            # to optimize away the call to get_llhelper(), else the helpers
            # are never seen and the C code is not generated.
            with lltype.scoped_alloc(rffi.CArray(rffi.VOIDP), 1) as ptr:
                for func in BRIDGE.all_functions:
                    ptr[0] = rffi.cast(rffi.VOIDP, func.get_llhelper(self.space))
        else:
            # before translation: put the ll2ctypes callbacks into the global
            # hpy_get_bridge(), so that they can be called from C
            bridge = hpy_get_bridge()
            for func in BRIDGE.all_functions:
                funcptr = rffi.cast(rffi.VOIDP, func.get_llhelper(self.space))
                fieldname = 'c_' + func.__name__
                setattr(bridge, fieldname, funcptr)

    def was_already_setup(self):
        return bool(self.u_handles.ctx)

    @not_rpython
    def reset(self):
        """
        Only for tests: reset all the C globals to match the current state.
        """
        self.setup_bridge()
        llapi.hpy_debug_set_ctx(self.d_handles.ctx)
        tctx = llapi.hpy_trace_get_ctx(self.u_handles.ctx)
        tctx.c__private = llapi.cts.cast('void*', 0)
        self.global_handles = {}

    def set_exception(self, operror):
        self.clear_exception()
        ec = self.space.getexecutioncontext()
        ec.cpyext_operror = operror

    def clear_exception(self):
        """Clear the current exception state, and return the operror."""
        ec = self.space.getexecutioncontext()
        operror = ec.cpyext_operror
        ec.cpyext_operror = None
        return operror

    def get_exception(self):
        ec = self.space.getexecutioncontext()
        return ec.cpyext_operror

    def raise_current_exception(self):
        operror = self.clear_exception()
        if operror:
            raise operror
        else:
            raise oefmt(self.space.w_SystemError,
                        "Function returned an error result without setting an "
                        "exception")