1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
#!/usr/bin/env python
#
# $Id: _bsd.py 1380 2012-06-27 00:49:01Z g.rodola $
#
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""BSD specific tests. These are implicitly run by test_psutil.py."""
import unittest
import subprocess
import time
import re
import sys
import psutil
from psutil._compat import PY3
from test_psutil import reap_children, get_test_subprocess, sh
def sysctl(cmdline):
"""Expects a sysctl command with an argument and parse the result
returning only the value of interest.
"""
result = sh("sysctl " + cmdline)
result = result[result.find(": ") + 2:]
try:
return int(result)
except ValueError:
return result
def parse_sysctl_vmtotal(output):
"""Parse sysctl vm.vmtotal output returning total and free memory
values.
"""
line = output.split('\n')[4] # our line of interest
mobj = re.match(r'Virtual\s+Memory.*Total:\s+(\d+)K,\s+Active\s+(\d+)K.*', line)
total, active = mobj.groups()
# values are represented in kilo bytes
total = int(total) * 1024
active = int(active) * 1024
free = total - active
return total, free
class BSDSpecificTestCase(unittest.TestCase):
def setUp(self):
self.pid = get_test_subprocess().pid
def tearDown(self):
reap_children()
def test_TOTAL_PHYMEM(self):
sysctl_hwphymem = sysctl('sysctl hw.physmem')
self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM)
def test_BOOT_TIME(self):
s = sysctl('sysctl kern.boottime')
s = s[s.find(" sec = ") + 7:]
s = s[:s.find(',')]
btime = int(s)
self.assertEqual(btime, psutil.BOOT_TIME)
def test_avail_phymem(self):
# This test is not particularly accurate and may fail if the OS is
# consuming memory for other applications.
# We just want to test that the difference between psutil result
# and sysctl's is not too high.
_sum = sum((sysctl("sysctl vm.stats.vm.v_inactive_count"),
sysctl("sysctl vm.stats.vm.v_cache_count"),
sysctl("sysctl vm.stats.vm.v_free_count")
))
_pagesize = sysctl("sysctl hw.pagesize")
sysctl_avail_phymem = _sum * _pagesize
psutil_avail_phymem = psutil.phymem_usage().free
difference = abs(psutil_avail_phymem - sysctl_avail_phymem)
# On my system both sysctl and psutil report the same values.
# Let's use a tollerance of 0.5 MB and consider the test as failed
# if we go over it.
if difference > (0.5 * 2**20):
self.fail("sysctl=%s; psutil=%s; difference=%s;" %(
sysctl_avail_phymem, psutil_avail_phymem, difference))
def test_total_virtmem(self):
# This test is not particularly accurate and may fail if the OS is
# consuming memory for other applications.
# We just want to test that the difference between psutil result
# and sysctl's is not too high.
p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIPE)
result = p.communicate()[0].strip()
if PY3:
result = str(result, sys.stdout.encoding)
sysctl_total_virtmem, _ = parse_sysctl_vmtotal(result)
psutil_total_virtmem = psutil.virtmem_usage().total
difference = abs(sysctl_total_virtmem - psutil_total_virtmem)
# On my system I get a difference of 4657152 bytes, probably because
# the system is consuming memory for this same test.
# Assuming psutil is right, let's use a tollerance of 10 MB and consider
# the test as failed if we go over it.
if difference > (10 * 2**20):
self.fail("sysctl=%s; psutil=%s; difference=%s;" %(
sysctl_total_virtmem, psutil_total_virtmem, difference))
def test_avail_virtmem(self):
# This test is not particularly accurate and may fail if the OS is
# consuming memory for other applications.
# We just want to test that the difference between psutil result
# and sysctl's is not too high.
p = subprocess.Popen("sysctl vm.vmtotal", shell=1, stdout=subprocess.PIPE)
result = p.communicate()[0].strip()
if PY3:
result = str(result, sys.stdout.encoding)
_, sysctl_avail_virtmem = parse_sysctl_vmtotal(result)
psutil_avail_virtmem = psutil.virtmem_usage().free
difference = abs(sysctl_avail_virtmem - psutil_avail_virtmem)
# let's assume the test is failed if difference is > 0.5 MB
if difference > (0.5 * 2**20):
self.fail(difference)
def test_process_create_time(self):
cmdline = "ps -o lstart -p %s" %self.pid
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0]
if PY3:
output = str(output, sys.stdout.encoding)
start_ps = output.replace('STARTED', '').strip()
start_psutil = psutil.Process(self.pid).create_time
start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
time.localtime(start_psutil))
self.assertEqual(start_ps, start_psutil)
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.free, free))
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % (usage.used, used))
def test_memory_maps(self):
out = sh('procstat -v %s' % self.pid)
maps = psutil.Process(self.pid).get_memory_maps(grouped=False)
lines = out.split('\n')[1:]
while lines:
line = lines.pop()
fields = line.split()
_, start, stop, perms, res = fields[:5]
map = maps.pop()
self.assertEqual("%s-%s" % (start, stop), map.addr)
self.assertEqual(int(res), map.rss)
if not map.path.startswith('['):
self.assertEqual(fields[10], map.path)
if __name__ == '__main__':
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase))
unittest.TextTestRunner(verbosity=2).run(test_suite)
|