File: heaptracker.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (113 lines) | stat: -rw-r--r-- 4,134 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from rpython.rtyper.lltypesystem import lltype
from rpython.rtyper import rclass

def is_immutable_struct(S):
    return isinstance(S, lltype.GcStruct) and S._hints.get('immutable', False)

def has_gcstruct_a_vtable(GCSTRUCT):
    if not isinstance(GCSTRUCT, lltype.GcStruct):
        return False
    if GCSTRUCT is rclass.OBJECT:
        return False
    while not GCSTRUCT._hints.get('typeptr'):
        _, GCSTRUCT = GCSTRUCT._first_struct()
        if GCSTRUCT is None:
            return False
    return True

def get_vtable_for_gcstruct(gccache, GCSTRUCT):
    # xxx hack: from a GcStruct representing an instance's
    # lowleveltype, return the corresponding vtable pointer.
    # Returns None if the GcStruct does not belong to an instance.
    if not isinstance(GCSTRUCT, lltype.GcStruct):
        return lltype.nullptr(rclass.OBJECT_VTABLE)
    if not has_gcstruct_a_vtable(GCSTRUCT):
        return lltype.nullptr(rclass.OBJECT_VTABLE)
    setup_cache_gcstruct2vtable(gccache)
    try:
        return gccache._cache_gcstruct2vtable[GCSTRUCT]
    except KeyError:
        return testing_gcstruct2vtable[GCSTRUCT]

def setup_cache_gcstruct2vtable(gccache):
    if not hasattr(gccache, '_cache_gcstruct2vtable'):
        cache = {}
        if gccache.rtyper:
            for rinstance in gccache.rtyper.instance_reprs.values():
                cache[rinstance.lowleveltype.TO] = rinstance.rclass.getvtable()
        gccache._cache_gcstruct2vtable = cache

def set_testing_vtable_for_gcstruct(GCSTRUCT, vtable, name):
    # only for tests that need to register the vtable of their malloc'ed
    # structures in case they are GcStruct inheriting from OBJECT.
    vtable.name = rclass.alloc_array_name(name)
    testing_gcstruct2vtable[GCSTRUCT] = vtable

testing_gcstruct2vtable = {}

# ____________________________________________________________


def all_fielddescrs(gccache, STRUCT, only_gc=False, res=None,
                    get_field_descr=None):
    from rpython.jit.backend.llsupport import descr

    if get_field_descr is None:
        get_field_descr = descr.get_field_descr
    if res is None:
        res = []
    # order is not relevant, except for tests
    for name in STRUCT._names:
        FIELD = getattr(STRUCT, name)
        if FIELD is lltype.Void:
            continue
        if name.startswith('c__pad'):
            continue
        if name == 'typeptr':
            continue # dealt otherwise
        elif isinstance(FIELD, lltype.Struct):
            all_fielddescrs(gccache, FIELD, only_gc, res, get_field_descr)
        elif (not only_gc) or (isinstance(FIELD, lltype.Ptr) and FIELD._needsgc()):
            res.append(get_field_descr(gccache, STRUCT, name))
    return res

def all_interiorfielddescrs(gccache, ARRAY, get_field_descr=None):
    from rpython.jit.backend.llsupport import descr
    from rpython.jit.codewriter.effectinfo import UnsupportedFieldExc

    if get_field_descr is None:
        get_field_descr = descr.get_field_descr
    # order is not relevant, except for tests
    STRUCT = ARRAY.OF
    res = []
    for name in STRUCT._names:
        FIELD = getattr(STRUCT, name)
        if FIELD is lltype.Void:
            continue
        if name == 'typeptr':
            continue # dealt otherwise
        elif isinstance(FIELD, lltype.Struct):
            raise UnsupportedFieldExc("unexpected array(struct(struct))")
        res.append(get_field_descr(gccache, ARRAY, name))
    return res

def gc_fielddescrs(gccache, STRUCT):
    return all_fielddescrs(gccache, STRUCT, True)

def get_fielddescr_index_in(STRUCT, fieldname, cur_index=0):
    for name in STRUCT._names:
        FIELD = getattr(STRUCT, name)
        if FIELD is lltype.Void:
            continue
        if name == 'typeptr':
            continue # dealt otherwise
        elif isinstance(FIELD, lltype.Struct):
            r = get_fielddescr_index_in(FIELD, fieldname, cur_index)
            if r >= 0:
                return r
            cur_index += -r - 1
            continue
        elif name == fieldname:
            return cur_index
        cur_index += 1
    return -cur_index - 1 # not found