1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
#!/usr/bin/env python
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""OSX specific tests. These are implicitly run by test_psutil.py."""
import unittest
import subprocess
import time
import sys
import os
import re
import psutil
from psutil._compat import PY3
from test_psutil import *
PAGESIZE = os.sysconf("SC_PAGE_SIZE")
def sysctl(cmdline):
"""Expects a sysctl command with an argument and parse the result
returning only the value of interest.
"""
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
result = p.communicate()[0].strip().split()[1]
if PY3:
result = str(result, sys.stdout.encoding)
try:
return int(result)
except ValueError:
return result
def vm_stat(field):
"""Wrapper around 'vm_stat' cmdline utility."""
out = sh('vm_stat')
for line in out.split('\n'):
if field in line:
break
else:
raise ValueError("line not found")
return int(re.search('\d+', line).group(0)) * PAGESIZE
class OSXSpecificTestCase(unittest.TestCase):
def setUp(self):
self.pid = get_test_subprocess().pid
def tearDown(self):
reap_children()
def test_process_create_time(self):
cmdline = "ps -o lstart -p %s" %self.pid
p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0]
if PY3:
output = str(output, sys.stdout.encoding)
start_ps = output.replace('STARTED', '').strip()
start_psutil = psutil.Process(self.pid).create_time
start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
time.localtime(start_psutil))
self.assertEqual(start_ps, start_psutil)
def test_disks(self):
# test psutil.disk_usage() and psutil.disk_partitions()
# against "df -a"
def df(path):
out = sh('df -k "%s"' % path).strip()
lines = out.split('\n')
lines.pop(0)
line = lines.pop(0)
dev, total, used, free = line.split()[:4]
if dev == 'none':
dev = ''
total = int(total) * 1024
used = int(used) * 1024
free = int(free) * 1024
return dev, total, used, free
for part in psutil.disk_partitions(all=False):
usage = psutil.disk_usage(part.mountpoint)
dev, total, used, free = df(part.mountpoint)
self.assertEqual(part.device, dev)
self.assertEqual(usage.total, total)
# 10 MB tollerance
if abs(usage.free - free) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.free, free)
if abs(usage.used - used) > 10 * 1024 * 1024:
self.fail("psutil=%s, df=%s" % usage.used, used)
# --- virtual mem
def test_vmem_total(self):
sysctl_hwphymem = sysctl('sysctl hw.memsize')
self.assertEqual(sysctl_hwphymem, psutil.TOTAL_PHYMEM)
@retry_before_failing()
def test_vmem_free(self):
num = vm_stat("free")
self.assertAlmostEqual(psutil.virtual_memory().free, num,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_active(self):
num = vm_stat("active")
self.assertAlmostEqual(psutil.virtual_memory().active, num,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_inactive(self):
num = vm_stat("inactive")
self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
delta=TOLERANCE)
@retry_before_failing()
def test_vmem_wired(self):
num = vm_stat("wired")
self.assertAlmostEqual(psutil.virtual_memory().wired, num,
delta=TOLERANCE)
# --- swap mem
def test_swapmem_sin(self):
num = vm_stat("Pageins")
self.assertEqual(psutil.swap_memory().sin, num)
def test_swapmem_sout(self):
num = vm_stat("Pageouts")
self.assertEqual(psutil.swap_memory().sout, num)
def test_swapmem_total(self):
tot1 = psutil.swap_memory().total
tot2 = 0
# OSX uses multiple cache files:
# http://en.wikipedia.org/wiki/Paging#OS_X
for name in os.listdir("/var/vm/"):
file = os.path.join("/var/vm", name)
if os.path.isfile(file):
tot2 += os.path.getsize(file)
self.assertEqual(tot1, tot2)
def test_main():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase))
result = unittest.TextTestRunner(verbosity=2).run(test_suite)
return result.wasSuccessful()
if __name__ == '__main__':
if not test_main():
sys.exit(1)
|