1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
|
from pypy.interpreter.error import OperationError, oefmt, wrap_oserror
from pypy.interpreter.gateway import WrappedDefault, unwrap_spec
from pypy.interpreter.pycode import CodeHookCache
from pypy.interpreter.pyframe import PyFrame
from pypy.interpreter.mixedmodule import MixedModule
from rpython.rlib.objectmodel import we_are_translated
from pypy.objspace.std.dictmultiobject import W_DictMultiObject
from pypy.objspace.std.listobject import W_ListObject
from pypy.objspace.std.setobject import W_BaseSetObject
from pypy.objspace.std.typeobject import MethodCache
from pypy.objspace.std.mapdict import MapAttrCache
from rpython.rlib import rposix, rgc, rstack
from rpython.rtyper.lltypesystem import rffi
def internal_repr(space, w_object):
return space.newtext('%r' % (w_object,))
def objects_in_repr(space):
"""The identitydict of objects currently being repr().
This object is thread-local and can be used in a __repr__ method
to avoid recursion.
"""
return space.get_objects_in_repr()
def attach_gdb(space):
"""Run an interp-level gdb (or pdb when untranslated)"""
from rpython.rlib.debug import attach_gdb
attach_gdb()
@unwrap_spec(name='text')
def method_cache_counter(space, name):
"""Return a tuple (method_cache_hits, method_cache_misses) for calls to
methods with the name."""
assert space.config.objspace.std.withmethodcachecounter
cache = space.fromcache(MethodCache)
return space.newtuple2(space.newint(cache.hits.get(name, 0)),
space.newint(cache.misses.get(name, 0)))
def reset_method_cache_counter(space):
"""Reset the method cache counter to zero for all method names."""
assert space.config.objspace.std.withmethodcachecounter
cache = space.fromcache(MethodCache)
cache.misses = {}
cache.hits = {}
cache = space.fromcache(MapAttrCache)
cache.misses = {}
cache.hits = {}
@unwrap_spec(name='text')
def mapdict_cache_counter(space, name):
"""Return a tuple (index_cache_hits, index_cache_misses) for lookups
in the mapdict cache with the given attribute name."""
assert space.config.objspace.std.withmethodcachecounter
cache = space.fromcache(MapAttrCache)
return space.newtuple2(space.newint(cache.hits.get(name, 0)),
space.newint(cache.misses.get(name, 0)))
def builtinify(space, w_func):
"""To implement at app-level modules that are, in CPython,
implemented in C: this decorator protects a function from being ever
bound like a method. Useful because some tests do things like put
a "built-in" function on a class and access it via the instance.
"""
from pypy.interpreter.function import Function, BuiltinFunction
func = space.interp_w(Function, w_func)
bltn = BuiltinFunction(func)
return bltn
def hidden_applevel(space, w_func):
"""Decorator that hides a function's frame from app-level"""
from pypy.interpreter.function import Function
func = space.interp_w(Function, w_func)
func.getcode().hidden_applevel = True
return w_func
@unwrap_spec(meth='text')
def lookup_special(space, w_obj, meth):
"""Lookup up a special method on an object."""
w_descr = space.lookup(w_obj, meth)
if w_descr is None:
return space.w_None
return space.get(w_descr, w_obj)
def do_what_I_mean(space):
"Return 42"
return space.newint(42)
def _internal_crash(space, w_crash=None):
"""for testing purposes, raise an interpreter-level ValueError. Should turn
into a SystemError automatically"""
raise ValueError # RPython-level, uncaught
def strategy(space, w_obj):
""" strategy(dict or list or set or instance)
Return the underlying strategy currently used by a dict, list or set object
"""
if isinstance(w_obj, W_DictMultiObject):
name = w_obj.get_strategy().__class__.__name__
elif isinstance(w_obj, W_ListObject):
name = w_obj.strategy.__class__.__name__
elif isinstance(w_obj, W_BaseSetObject):
name = w_obj.strategy.__class__.__name__
else:
m = w_obj._get_mapdict_map()
if m is not None:
name = m.repr()
else:
raise oefmt(space.w_TypeError, "expecting dict or list or set object, or instance of some kind")
return space.newtext(name)
def list_get_physical_size(space, w_obj):
if not isinstance(w_obj, W_ListObject):
raise oefmt(space.w_TypeError, "expected list")
return space.newint(w_obj.physical_size())
def get_console_cp(space):
"""get_console_cp()
Return the console and console output code page (windows only)
"""
from rpython.rlib import rwin32 # Windows only
return space.newtuple2(
space.newtext('cp%d' % rwin32.GetConsoleCP()),
space.newtext('cp%d' % rwin32.GetConsoleOutputCP()),
)
@unwrap_spec(fd=int)
def get_osfhandle(space, fd):
"""get_osfhandle()
Return the handle corresponding to the file descriptor (windows only)
"""
from rpython.rlib import rwin32 # Windows only
try:
ret = rwin32.get_osfhandle(fd)
return space.newint(rffi.cast(rffi.INT, ret))
except OSError as e:
raise wrap_oserror(space, e)
@unwrap_spec(sizehint=int)
def resizelist_hint(space, w_list, sizehint):
""" Reallocate the underlying storage of the argument list to sizehint """
if not isinstance(w_list, W_ListObject):
raise oefmt(space.w_TypeError, "arg 1 must be a 'list'")
w_list._resize_hint(sizehint)
@unwrap_spec(sizehint=int)
def newlist_hint(space, sizehint):
""" Create a new empty list that has an underlying storage of length sizehint """
return space.newlist_hint(sizehint)
@unwrap_spec(estimate=int)
def add_memory_pressure(space, estimate):
""" Add memory pressure of estimate bytes. Useful when calling a C function
that internally allocates a big chunk of memory. This instructs the GC to
garbage collect sooner than it would otherwise."""
rgc.add_memory_pressure(estimate)
@unwrap_spec(w_frame=PyFrame)
def locals_to_fast(space, w_frame):
assert isinstance(w_frame, PyFrame)
w_frame.locals2fast()
def set_code_callback(space, w_callable):
cache = space.fromcache(CodeHookCache)
if space.is_none(w_callable):
cache._code_hook = None
else:
cache._code_hook = w_callable
@unwrap_spec(string='bytes', byteorder='text', signed=int)
def decode_long(space, string, byteorder='little', signed=1):
from rpython.rlib.rbigint import rbigint, InvalidEndiannessError
try:
result = rbigint.frombytes(string, byteorder, bool(signed))
except InvalidEndiannessError:
raise oefmt(space.w_ValueError, "invalid byteorder argument")
return space.newlong_from_rbigint(result)
def _promote(space, w_obj):
""" Promote the first argument of the function and return it. Promote is by
value for ints, floats, strs, unicodes (but not subclasses thereof) and by
reference otherwise. (Unicodes not supported right now.)
This function is experimental!"""
from rpython.rlib import jit
if space.is_w(space.type(w_obj), space.w_int):
jit.promote(space.int_w(w_obj))
elif space.is_w(space.type(w_obj), space.w_float):
jit.promote(space.float_w(w_obj))
elif space.is_w(space.type(w_obj), space.w_bytes):
jit.promote_string(space.bytes_w(w_obj))
elif space.is_w(space.type(w_obj), space.w_unicode):
raise oefmt(space.w_TypeError, "promoting unicode unsupported")
else:
jit.promote(w_obj)
return w_obj
@unwrap_spec(w_value=WrappedDefault(None), w_tb=WrappedDefault(None))
def normalize_exc(space, w_type, w_value=None, w_tb=None):
operr = OperationError(w_type, w_value, w_tb)
return operr.normalize_exception(space)
def stack_almost_full(space):
"""Return True if the stack is more than 15/16th full."""
return space.newbool(rstack.stack_almost_full())
def fsencode(space, w_obj):
"""Direct access to the interp-level fsencode()"""
return space.fsencode(w_obj)
def fsdecode(space, w_obj):
"""Direct access to the interp-level fsdecode()"""
return space.fsdecode(w_obj)
def side_effects_ok(space):
"""For use with the reverse-debugger: this function normally returns
True, but will return False if we are evaluating a debugging command
like a watchpoint. You are responsible for not doing any side effect
at all (including no caching) when evaluating watchpoints. This
function is meant to help a bit---you can write:
if not __pypy__.side_effects_ok():
skip the caching logic
inside getter methods or properties, to make them usable from
watchpoints. Note that you need to re-run ``REVDB=.. pypy''
after changing the Python code.
"""
return space.newbool(space._side_effects_ok())
def revdb_stop(space):
from pypy.interpreter.reverse_debugging import stop_point
stop_point()
def pyos_inputhook(space):
"""Call PyOS_InputHook() from the CPython C API."""
if not space.config.objspace.usemodules.cpyext:
return
w_modules = space.sys.get('modules')
if space.finditem_str(w_modules, 'cpyext') is None:
return # cpyext not imported yet, ignore
from pypy.module.cpyext.api import invoke_pyos_inputhook
invoke_pyos_inputhook(space)
def utf8content(space, w_u):
""" Given a unicode string u, return it's internal byte representation.
Useful for debugging only. """
from pypy.objspace.std.unicodeobject import W_UnicodeObject
if type(w_u) is not W_UnicodeObject:
raise oefmt(space.w_TypeError, "expected unicode string, got %T", w_u)
return space.newbytes(w_u._utf8)
def set_exc_info(space, w_type, w_value, w_traceback=None):
ec = space.getexecutioncontext()
ec.set_sys_exc_info3(w_value)
def get_contextvar_context(space):
ec = space.getexecutioncontext()
context = ec.contextvar_context
if context:
return context
else:
return space.w_None
def set_contextvar_context(space, w_obj):
ec = space.getexecutioncontext()
ec.contextvar_context = w_obj
return space.w_None
@unwrap_spec(where='text')
def write_unraisable(space, where, w_exc, w_obj):
"""write_unraisable(where, exc, obj)
Equivalent to CPython's _PyErr_WriteUnraisableMsg()
where: msg to write (text)
exc: error raised
obj: object to print its repr
"""
OperationError(space.type(w_exc), w_exc).write_unraisable(
space, where, w_obj, with_traceback=True)
def _testing_clear_audithooks(space):
if we_are_translated():
raise oefmt(space.w_RuntimeError, "can only use _testing_clear_audithooks before translation")
from pypy.module.sys.vm import AuditHolder
holder = space.fromcache(AuditHolder)
holder.hook_chain = None
|