1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
#!/usr/bin/env python
#
# $Id: _posix.py 1386 2012-06-27 15:44:36Z g.rodola $
#
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""POSIX specific tests. These are implicitly run by test_psutil.py."""
import unittest
import subprocess
import time
import sys
import os
import datetime
import psutil
from psutil._compat import PY3
from test_psutil import (get_test_subprocess, reap_children, PYTHON, LINUX, OSX,
BSD, ignore_access_denied, sh, skipIf)
def ps(cmd):
"""Expects a ps command with a -o argument and parse the result
returning only the value of interest.
"""
if not LINUX:
cmd = cmd.replace(" --no-headers ", " ")
p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if PY3:
output = str(output, sys.stdout.encoding)
if not LINUX:
output = output.split('\n')[1]
try:
return int(output)
except ValueError:
return output
class PosixSpecificTestCase(unittest.TestCase):
"""Compare psutil results against 'ps' command line utility."""
# for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
def setUp(self):
self.pid = get_test_subprocess([PYTHON, "-E", "-O"],
stdin=subprocess.PIPE).pid
def tearDown(self):
reap_children()
def test_process_parent_pid(self):
ppid_ps = ps("ps --no-headers -o ppid -p %s" %self.pid)
ppid_psutil = psutil.Process(self.pid).ppid
self.assertEqual(ppid_ps, ppid_psutil)
def test_process_uid(self):
uid_ps = ps("ps --no-headers -o uid -p %s" %self.pid)
uid_psutil = psutil.Process(self.pid).uids.real
self.assertEqual(uid_ps, uid_psutil)
def test_process_gid(self):
gid_ps = ps("ps --no-headers -o rgid -p %s" %self.pid)
gid_psutil = psutil.Process(self.pid).gids.real
self.assertEqual(gid_ps, gid_psutil)
def test_process_username(self):
username_ps = ps("ps --no-headers -o user -p %s" %self.pid)
username_psutil = psutil.Process(self.pid).username
self.assertEqual(username_ps, username_psutil)
@ignore_access_denied
def test_process_rss_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
rss_ps = ps("ps --no-headers -o rss -p %s" %self.pid)
rss_psutil = psutil.Process(self.pid).get_memory_info()[0] / 1024
self.assertEqual(rss_ps, rss_psutil)
@ignore_access_denied
def test_process_vsz_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
vsz_ps = ps("ps --no-headers -o vsz -p %s" %self.pid)
vsz_psutil = psutil.Process(self.pid).get_memory_info()[1] / 1024
self.assertEqual(vsz_ps, vsz_psutil)
def test_process_name(self):
# use command + arg since "comm" keyword not supported on all platforms
name_ps = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
# remove path if there is any, from the command
name_ps = os.path.basename(name_ps).lower()
name_psutil = psutil.Process(self.pid).name.lower()
self.assertEqual(name_ps, name_psutil)
@skipIf(OSX or BSD)
def test_process_create_time(self):
time_ps = ps("ps --no-headers -o start -p %s" %self.pid).split(' ')[0]
time_psutil = psutil.Process(self.pid).create_time
time_psutil = datetime.datetime.fromtimestamp(
time_psutil).strftime("%H:%M:%S")
self.assertEqual(time_ps, time_psutil)
def test_process_exe(self):
ps_pathname = ps("ps --no-headers -o command -p %s" %self.pid).split(' ')[0]
psutil_pathname = psutil.Process(self.pid).exe
try:
self.assertEqual(ps_pathname, psutil_pathname)
except AssertionError:
# certain platforms such as BSD are more accurate returning:
# "/usr/local/bin/python2.7"
# ...instead of:
# "/usr/local/bin/python"
# We do not want to consider this difference in accuracy
# an error.
adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
self.assertEqual(ps_pathname, adjusted_ps_pathname)
def test_process_cmdline(self):
ps_cmdline = ps("ps --no-headers -o command -p %s" %self.pid)
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline)
self.assertEqual(ps_cmdline, psutil_cmdline)
def test_get_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
p = get_test_subprocess(["ps", "ax", "-o", "pid"], stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if PY3:
output = str(output, sys.stdout.encoding)
output = output.replace('PID', '')
p.wait()
pids_ps = []
for pid in output.split('\n'):
if pid:
pids_ps.append(int(pid.strip()))
# remove ps subprocess pid which is supposed to be dead in meantime
pids_ps.remove(p.pid)
pids_psutil = psutil.get_pid_list()
pids_ps.sort()
pids_psutil.sort()
# on OSX ps doesn't show pid 0
if OSX and 0 not in pids_ps:
pids_ps.insert(0, 0)
if pids_ps != pids_psutil:
difference = [x for x in pids_psutil if x not in pids_ps] + \
[x for x in pids_ps if x not in pids_psutil]
self.fail("difference: " + str(difference))
def test_nic_names(self):
p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
output = p.communicate()[0].strip()
if PY3:
output = str(output, sys.stdout.encoding)
for nic in psutil.network_io_counters(pernic=True).keys():
for line in output.split():
if line.startswith(nic):
break
else:
self.fail("couldn't find %s nic in 'ifconfig -a' output" % nic)
def test_get_users(self):
out = sh("who")
lines = out.split('\n')
users = [x.split()[0] for x in lines]
self.assertEqual(len(users), len(psutil.get_users()))
terminals = [x.split()[1] for x in lines]
for u in psutil.get_users():
self.assertTrue(u.name in users, u.name)
self.assertTrue(u.terminal in terminals, u.terminal)
if __name__ == '__main__':
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
unittest.TextTestRunner(verbosity=2).run(test_suite)
|