File: interp_imp.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (173 lines) | stat: -rw-r--r-- 5,867 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from pypy.module.imp import importing
from rpython.rlib import streamio
from pypy.interpreter.error import oefmt
from pypy.interpreter.gateway import unwrap_spec
from pypy.interpreter.pycode import PyCode
from pypy.module._io.interp_iobase import W_IOBase
from pypy.interpreter.streamutil import wrap_streamerror
from pypy.interpreter.error import OperationError


def extension_suffixes(space):
    suffixes_w = []
    so_ext = importing.get_so_extension(space)
    suffixes_w.append(space.newtext(so_ext))
    return space.newlist(suffixes_w)

def get_magic(space):
    x = importing.get_pyc_magic(space)
    a = x & 0xff
    x >>= 8
    b = x & 0xff
    x >>= 8
    c = x & 0xff
    x >>= 8
    d = x & 0xff
    return space.newbytes(chr(a) + chr(b) + chr(c) + chr(d))

def get_tag(space):
    """get_tag() -> string
    Return the magic tag for .pyc files."""
    return space.newtext(importing.PYC_TAG)

def get_file(space, w_file, filename, filemode):
    if space.is_none(w_file):
        try:
            return streamio.open_file_as_stream(filename, filemode)
        except streamio.StreamErrors as e:
            # XXX this is not quite the correct place, but it will do for now.
            # XXX see the issue which I'm sure exists already but whose number
            # XXX I cannot find any more...
            raise wrap_streamerror(space, e)
    else:
        w_iobase = space.interp_w(W_IOBase, w_file)
        # XXX: not all W_IOBase have a fileno method: in that case, we should
        # probably raise a TypeError?
        fd = space.int_w(space.call_method(w_iobase, 'fileno'))
        return streamio.fdopen_as_stream(fd, filemode)

def create_dynamic(space, w_spec, w_file=None):
    if not importing.has_so_extension(space):
        raise oefmt(space.w_ImportError, "Not implemented")
    from pypy.module.cpyext.api import create_extension_module
    # NB. cpyext.api.create_extension_module() can also delegate to _cffi_backend
    return create_extension_module(space, w_spec)

def create_builtin(space, w_spec):
    w_name = space.getattr(w_spec, space.newtext("name"))
    name = space.text0_w(w_name)
    # force_init is needed to make reload actually reload instead of just
    # using the already-present module in sys.modules.

    # If the module is already in sys.modules, it must be a reload, so
    # we want to reuse (and reinitialize) the existing module object
    reuse = space.finditem(space.sys.get('modules'), w_name) is not None
    return space.getbuiltinmodule(name, force_init=True, reuse=reuse)

def exec_dynamic(space, w_mod):
    from pypy.module.cpyext.api import exec_extension_module
    exec_extension_module(space, w_mod)

def exec_builtin(space, w_mod):
    return

def init_frozen(space, w_name):
    return None

def is_builtin(space, w_name):
    try:
        name = space.text0_w(w_name)
    except OperationError:
        return space.newint(0)

    if name not in space.builtin_modules:
        return space.newint(0)
    if space.finditem(space.sys.get('modules'), w_name) is not None:
        return space.newint(-1)   # cannot be initialized again
    return space.newint(1)

def is_frozen(space, w_name):
    name = space.text_w(w_name)
    if name in ("_frozen_importlib", "_frozen_importlib_external", "zipimport",
                # For tests, not implemented on PyPy
                "__hello__", "__phello__", "_phello__.spam"):
        return space.w_True
    return space.w_False

def use_frozen(space):
    """private function that allows never using frozen modules"""
    from pypy.module.imp.state import State
    state = space.fromcache(State)
    override = state.override_frozen_modules
    if override > 0:
        return True
    elif override < 0:
        return False
    # TODO: when built with Py_DEBUG, return False
    return True


def list_frozen_module_names(space):
    always_freeze = space.newlist([space.newtext(s) for s in [
        "_frozen_importlib", "_frozen_importlib_external", "zipimport",
        ]])

    frozen = always_freeze
    # issue 5098: also freeze test and stdlib modules
    if use_frozen(space):
        # For tests, not implemented on PyPy
        # frozen +=  ["__hello__", "__phello__", "_phello__.spam"]
        # For speedups
        # frozen += stdlib_frozens
        pass
    return frozen

def get_frozen_object(space, w_name):
    raise oefmt(space.w_ImportError,
                "No such frozen object named %R", w_name)

def is_frozen_package(space, w_name):
    return space.w_False

def find_frozen(space, w_name):
    return space.w_None
    # TODO: use the stdlibfrozens and test frozens


#__________________________________________________________________

def lock_held(space):
    if space.config.objspace.usemodules.thread:
        return space.newbool(importing.getimportlock(space).lock_held_by_anyone())
    else:
        return space.w_False

def acquire_lock(space):
    if space.config.objspace.usemodules.thread:
        importing.getimportlock(space).acquire_lock()

def release_lock(space):
    if space.config.objspace.usemodules.thread:
        importing.getimportlock(space).release_lock(silent_after_fork=False)

def reinit_lock(space):
    if space.config.objspace.usemodules.thread:
        importing.getimportlock(space).reinit_lock()

@unwrap_spec(pathname='fsencode')
def fix_co_filename(space, w_code, pathname):
    code_w = space.interp_w(PyCode, w_code)
    importing.update_code_filenames(space, code_w, pathname)


@unwrap_spec(magic=int, content='bytes')
def source_hash(space, magic, content):
    from rpython.rlib.rsiphash import siphash24_with_key
    from rpython.rlib.rarithmetic import r_uint64
    h = siphash24_with_key(content, r_uint64(magic))
    res = [b"x"] * 8
    for i in range(8):
        res[i] = chr(h & 0xff)
        h >>= 8
    assert not h
    return space.newbytes(b"".join(res))