1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
"""
Check the measured process sizes. If we are on a platform which supports
multiple measuring facilities (e.g. Linux), check if the reported sizes match.
This should help to protect against scaling errors (e.g. Byte vs KiB) or using
the wrong value for a different measure (e.g. resident in physical memory vs
virtual memory size).
"""
import sys
import unittest
try:
from unittest import mock
except ImportError: # Python 2.7
mock = None
from pympler import process
class ProcessMemoryTests(unittest.TestCase):
def _match_sizes(self, pi1, pi2, ignore=[]):
"""
Match sizes by comparing each set field. Process size may change
inbetween two measurements.
"""
if pi1.available and pi2.available:
for arg in ('vsz', 'rss', 'data_segment', 'shared_segment',
'stack_segment', 'code_segment'):
if arg in ignore:
continue
size1 = getattr(pi1, arg)
size2 = getattr(pi2, arg)
if size1 and size2:
delta = abs(size1 - size2)
# Allow for a difference of the size of two pages or 5%
if delta > pi1.pagesize * 2 and delta > size1 * 0.05:
self.fail("%s mismatch: %d != %d" % (arg, size1, size2))
if pi1.pagefaults and pi2.pagefaults:
# If both records report pagefaults compare the reported
# number. If a pagefault happens after taking the first
# snapshot and before taking the second the latter will show a
# higher pagefault number. In that case take another snapshot
# with the first variant and check it's now reporting a higher
# number as well. We assume pagefaults statistics are
# monotonic.
if pi1.pagefaults < pi2.pagefaults:
pi1.update()
if pi1.pagefaults < pi2.pagefaults:
pf1 = pi1.pagefaults
pf2 = pi2.pagefaults
self.fail("Pagefault mismatch: %d != %d" % (pf1, pf2))
else:
self.assertEqual(pi1.pagefaults, pi2.pagefaults)
if pi1.pagesize and pi2.pagesize:
self.assertEqual(pi1.pagesize, pi2.pagesize)
def test_ps_vs_proc_sizes(self):
'''Test process sizes match: ps util vs /proc/self/stat
'''
psinfo = process._ProcessMemoryInfoPS()
procinfo = process._ProcessMemoryInfoProc()
self._match_sizes(psinfo, procinfo)
def test_ps_vs_getrusage(self):
'''Test process sizes match: ps util vs getrusage
'''
psinfo = process._ProcessMemoryInfoPS()
try:
resinfo = process._ProcessMemoryInfoResource()
except AttributeError:
pass
else:
self._match_sizes(psinfo, resinfo, ignore=['rss'])
if psinfo.available and resinfo.available:
self.assert_(resinfo.rss >= psinfo.rss)
def test_proc_vs_getrusage(self):
'''Test process sizes match: /proc/self/stat util vs getrusage
'''
procinfo = process._ProcessMemoryInfoProc()
try:
resinfo = process._ProcessMemoryInfoResource()
except AttributeError:
pass
else:
self._match_sizes(procinfo, resinfo, ignore=['rss'])
if procinfo.available and resinfo.available:
self.assert_(resinfo.rss >= procinfo.rss)
def test_get_current_threads(self):
'''Test thread info is extracted.'''
tinfos = process.get_current_threads()
for tinfo in tinfos:
self.assertEqual(type(tinfo.ident), int)
self.assertEqual(type(tinfo.name), type(''))
self.assertEqual(type(tinfo.daemon), type(True))
self.failIfEqual(tinfo.ident, 0)
def test_proc(self):
'''Test reading proc stats with mock data.'''
if mock is None:
return
mock_stat = mock.mock_open(read_data='22411 (cat) R 22301 22411 22301 34818 22411 4194304 82 0 0 0 0 0 0 0 20 0 1 0 709170 8155136 221 18446744073709551615 94052544688128 94052544719312 140729623469552 0 0 0 0 0 0 0 0 0 17 6 0 0 0 0 0 94052546816624 94052546818240 94052566347776 140729623473446 140729623473466 140729623473466 140729623478255 0')
mock_status = mock.mock_open(read_data='Name: cat\n\nVmData: 2 kB\nMultiple colons: 1:1')
with mock.patch('builtins.open', new_callable=mock.mock_open) as mock_file:
mock_file.side_effect = [mock_stat.return_value, mock_status.return_value]
procinfo = process._ProcessMemoryInfoProc()
self.assertTrue(procinfo.available)
self.assertEqual(procinfo.vsz, 8155136)
if sys.version_info >= (3, 4): # Python 3.3 doesn't support mock_open.readlines
self.assertEqual(procinfo.data_segment, 2048)
if __name__ == "__main__":
suite = unittest.TestSuite()
tclasses = [ ProcessMemoryTests, ]
for tclass in tclasses:
names = unittest.getTestCaseNames(tclass, 'test_')
suite.addTests(map(tclass, names))
if not unittest.TextTestRunner().run(suite).wasSuccessful():
sys.exit(1)
|