File: heaptracker.py

package info (click to toggle)
pypy 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 107,216 kB
  • sloc: python: 1,201,787; ansic: 62,419; asm: 5,169; cpp: 3,017; sh: 2,534; makefile: 545; xml: 243; lisp: 45; awk: 4
file content (134 lines) | stat: -rw-r--r-- 4,724 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from rpython.rtyper.lltypesystem import lltype, llmemory
from rpython.rtyper import rclass
from rpython.rlib.objectmodel import we_are_translated
from rpython.rlib.rarithmetic import r_uint, intmask


def adr2int(addr):
    # Cast an address to an int.  Returns an AddressAsInt object which
    # can be cast back to an address.
    return llmemory.cast_adr_to_int(addr, "symbolic")

def int2adr(int):
    return llmemory.cast_int_to_adr(int)

def int_signext(value, numbytes):
    b8 = numbytes * 8
    a = r_uint(value)
    a += r_uint(1 << (b8 - 1))     # a += 128
    a &= r_uint((1 << b8) - 1)     # a &= 255
    a -= r_uint(1 << (b8 - 1))     # a -= 128
    return intmask(a)

def is_immutable_struct(S):
    return isinstance(S, lltype.GcStruct) and S._hints.get('immutable', False)

# ____________________________________________________________

def has_gcstruct_a_vtable(GCSTRUCT):
    if not isinstance(GCSTRUCT, lltype.GcStruct):
        return False
    if GCSTRUCT is rclass.OBJECT:
        return False
    while not GCSTRUCT._hints.get('typeptr'):
        _, GCSTRUCT = GCSTRUCT._first_struct()
        if GCSTRUCT is None:
            return False
    return True

def get_vtable_for_gcstruct(gccache, GCSTRUCT):
    # xxx hack: from a GcStruct representing an instance's
    # lowleveltype, return the corresponding vtable pointer.
    # Returns None if the GcStruct does not belong to an instance.
    if not isinstance(GCSTRUCT, lltype.GcStruct):
        return lltype.nullptr(rclass.OBJECT_VTABLE)
    if not has_gcstruct_a_vtable(GCSTRUCT):
        return lltype.nullptr(rclass.OBJECT_VTABLE)
    setup_cache_gcstruct2vtable(gccache)
    try:
        return gccache._cache_gcstruct2vtable[GCSTRUCT]
    except KeyError:
        return testing_gcstruct2vtable[GCSTRUCT]

def setup_cache_gcstruct2vtable(gccache):
    if not hasattr(gccache, '_cache_gcstruct2vtable'):
        cache = {}
        if gccache.rtyper:
            for rinstance in gccache.rtyper.instance_reprs.values():
                cache[rinstance.lowleveltype.TO] = rinstance.rclass.getvtable()
        gccache._cache_gcstruct2vtable = cache

def set_testing_vtable_for_gcstruct(GCSTRUCT, vtable, name):
    # only for tests that need to register the vtable of their malloc'ed
    # structures in case they are GcStruct inheriting from OBJECT.
    vtable.name = rclass.alloc_array_name(name)
    testing_gcstruct2vtable[GCSTRUCT] = vtable

testing_gcstruct2vtable = {}

# ____________________________________________________________


def all_fielddescrs(gccache, STRUCT, only_gc=False, res=None,
                    get_field_descr=None):
    from rpython.jit.backend.llsupport import descr

    if get_field_descr is None:
        get_field_descr = descr.get_field_descr
    if res is None:
        res = []
    # order is not relevant, except for tests
    for name in STRUCT._names:
        FIELD = getattr(STRUCT, name)
        if FIELD is lltype.Void:
            continue
        if name.startswith('c__pad'):
            continue
        if name == 'typeptr':
            continue # dealt otherwise
        elif isinstance(FIELD, lltype.Struct):
            all_fielddescrs(gccache, FIELD, only_gc, res, get_field_descr)
        elif (not only_gc) or (isinstance(FIELD, lltype.Ptr) and FIELD._needsgc()):
            res.append(get_field_descr(gccache, STRUCT, name))
    return res

def all_interiorfielddescrs(gccache, ARRAY, get_field_descr=None):
    from rpython.jit.backend.llsupport import descr

    if get_field_descr is None:
        get_field_descr = descr.get_field_descr
    # order is not relevant, except for tests
    STRUCT = ARRAY.OF
    res = []
    for name in STRUCT._names:
        FIELD = getattr(STRUCT, name)
        if FIELD is lltype.Void:
            continue
        if name == 'typeptr':
            continue # dealt otherwise
        elif isinstance(FIELD, lltype.Struct):
            raise Exception("unexpected array(struct(struct))")
        res.append(get_field_descr(gccache, ARRAY, name))
    return res

def gc_fielddescrs(gccache, STRUCT):
    return all_fielddescrs(gccache, STRUCT, True)

def get_fielddescr_index_in(STRUCT, fieldname, cur_index=0):
    for name in STRUCT._names:
        FIELD = getattr(STRUCT, name)
        if FIELD is lltype.Void:
            continue
        if name == 'typeptr':
            continue # dealt otherwise
        elif isinstance(FIELD, lltype.Struct):
            r = get_fielddescr_index_in(FIELD, fieldname, cur_index)
            if r >= 0:
                return r
            cur_index += -r - 1
            continue
        elif name == fieldname:
            return cur_index
        cur_index += 1
    return -cur_index - 1 # not found