1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
import pytest
import sys
from rpython.rlib import rvmprof
from rpython.tool.udir import udir
class AppTestVMProf(object):
spaceconfig = {'usemodules': ['_vmprof', 'struct']}
def setup_class(cls):
cls.w_tmpfilename = cls.space.wrap(str(udir.join('test__vmprof.1')))
cls.w_tmpfilename2 = cls.space.wrap(str(udir.join('test__vmprof.2')))
cls.w_plain = cls.space.wrap(not cls.runappdirect and
'__pypy__' not in sys.builtin_module_names)
def test_import_vmprof(self):
tmpfile = open(self.tmpfilename, 'wb')
tmpfileno = tmpfile.fileno()
tmpfile2 = open(self.tmpfilename2, 'wb')
tmpfileno2 = tmpfile2.fileno()
import struct, sys, gc
WORD = struct.calcsize('l')
def count(s):
i = 0
count = 0
i += 5 * WORD # header
assert s[i ] == 5 # MARKER_HEADER
assert s[i + 1] == 0 # 0
assert s[i + 2] == 6 # VERSION_TIMESTAMP
assert s[i + 3] == 8 # PROFILE_RPYTHON
assert s[i + 4] == 4 # len('pypy')
assert s[i + 5: i + 9] == b'pypy'
i += 9
while i < len(s):
if s[i] == 3:
break
elif s[i] == 1:
i += 1
_, size = struct.unpack("ll", s[i:i + 2 * WORD])
i += 2 * WORD + size * struct.calcsize("P")
i += WORD # thread id
elif s[i] == 2:
i += 1
_, size = struct.unpack("ll", s[i:i + 2 * WORD])
count += 1
i += 2 * WORD + size
elif s[i] == 6:
print(s[i:i+24])
i += 1+8+8+8
elif s[i] == 7:
i += 1
# skip string
size, = struct.unpack("l", s[i:i + WORD])
i += WORD+size
# skip string
size, = struct.unpack("l", s[i:i + WORD])
i += WORD+size
else:
raise AssertionError(s[i])
return count
import _vmprof
gc.collect() # try to make the weakref list deterministic
gc.collect() # by freeing all dead code objects
_vmprof.enable(tmpfileno, 0.01, 0, 0, 0, 0)
_vmprof.disable()
s = open(self.tmpfilename, 'rb').read()
no_of_codes = count(s)
assert no_of_codes > 10
d = {}
def exec_(code, d):
exec(code, d)
exec_("""def foo():
pass
""", d)
gc.collect()
gc.collect()
_vmprof.enable(tmpfileno2, 0.01, 0, 0, 0, 0)
exec_("""def foo2():
pass
""", d)
_vmprof.disable()
s = open(self.tmpfilename2, 'rb').read()
no_of_codes2 = count(s)
assert b"py:foo:" in s
assert b"py:foo2:" in s
assert no_of_codes2 >= no_of_codes + 2 # some extra codes from tests
def test_enable_ovf(self):
import _vmprof
raises(_vmprof.VMProfError, _vmprof.enable, 2, 0, 0, 0, 0, 0)
raises(_vmprof.VMProfError, _vmprof.enable, 2, -2.5, 0, 0, 0, 0)
raises(_vmprof.VMProfError, _vmprof.enable, 2, 1e300, 0, 0, 0, 0)
raises(_vmprof.VMProfError, _vmprof.enable, 2, 1e300 * 1e300, 0, 0, 0, 0)
NaN = (1e300*1e300) / (1e300*1e300)
raises(_vmprof.VMProfError, _vmprof.enable, 2, NaN, 0, 0, 0, 0)
def test_is_enabled(self):
import _vmprof
tmpfile = open(self.tmpfilename, 'wb')
assert _vmprof.is_enabled() is False
_vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
assert _vmprof.is_enabled() is True
_vmprof.disable()
assert _vmprof.is_enabled() is False
@pytest.mark.xfail(sys.platform.startswith('freebsd'), reason = "not implemented")
def test_get_profile_path(self):
import _vmprof
with open(self.tmpfilename, "wb") as tmpfile:
assert _vmprof.get_profile_path() is None
_vmprof.enable(tmpfile.fileno(), 0.01, 0, 0, 0, 0)
path = _vmprof.get_profile_path()
_vmprof.disable()
if path != tmpfile.name:
with open(path, "rb") as fd1:
with open(self.tmpfilename, "rb") as fd2:
assert fd1.read() == fd2.read()
assert _vmprof.get_profile_path() is None
def test_stop_sampling(self):
if not self.plain:
skip("unreliable test except on CPython without -A")
import os
import _vmprof
tmpfile = open(self.tmpfilename, 'wb')
native = 1
def f():
import sys
import math
j = sys.maxsize
for i in range(500):
j = math.sqrt(j)
_vmprof.enable(tmpfile.fileno(), 0.01, 0, native, 0, 0)
# get_vmprof_stack() always returns 0 here!
# see vmprof_common.c and assume RPYTHON_LL2CTYPES is defined!
f()
fileno = _vmprof.stop_sampling()
pos = os.lseek(fileno, 0, os.SEEK_CUR)
f()
pos2 = os.lseek(fileno, 0, os.SEEK_CUR)
assert pos == pos2
_vmprof.start_sampling()
f()
fileno = _vmprof.stop_sampling()
pos3 = os.lseek(fileno, 0, os.SEEK_CUR)
assert pos3 > pos
_vmprof.disable()
|