File: _common.py

package info (click to toggle)
python-psutil 0.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 740 kB
  • sloc: ansic: 5,959; python: 5,194; makefile: 31
file content (139 lines) | stat: -rw-r--r-- 4,586 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#/usr/bin/env python
#
#$Id: _common.py 1387 2012-06-27 16:33:45Z g.rodola $
#
# Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Common objects shared by all _ps* modules."""

from __future__ import division
from psutil._compat import namedtuple, long

# --- functions

def usage_percent(used, total, _round=None):
    """Calculate percentage usage of 'used' against 'total'."""
    try:
        ret = (used / total) * 100
    except ZeroDivisionError:
        ret = 0
    if _round is not None:
        return round(ret, _round)
    else:
        return ret


class constant(int):
    """A constant type; overrides base int to provide a useful name on str()."""

    def __new__(cls, value, name, doc=None):
        inst = super(constant, cls).__new__(cls, value)
        inst._name = name
        if doc is not None:
            inst.__doc__ = doc
        return inst

    def __str__(self):
        return self._name

    def __eq__(self, other):
        # Use both int or str values when comparing for equality
        # (useful for serialization):
        # >>> st = constant(0, "running")
        # >>> st == 0
        # True
        # >>> st == 'running'
        # True
        if isinstance(other, int):
            return int(self) == other
        if isinstance(other, long):
            return long(self) == other
        if isinstance(other, str):
            return self._name == other
        return False

    def __ne__(self, other):
        return not self.__eq__(other)

def memoize(f):
    """A simple memoize decorator for functions."""
    cache= {}
    def memf(*x):
        if x not in cache:
            cache[x] = f(*x)
        return cache[x]
    return memf

class cached_property(object):
    """A memoize decorator for class properties."""
    def __init__(self, func):
        self.func = func

    def __get__(self, instance, type):
        res = instance.__dict__[self.func.__name__] = self.func(instance)
        return res


# --- constants

STATUS_RUNNING = constant(0, "running")
STATUS_SLEEPING = constant(1, "sleeping")
STATUS_DISK_SLEEP = constant(2, "disk sleep")
STATUS_STOPPED = constant(3, "stopped")
STATUS_TRACING_STOP = constant(4, "tracing stop")
STATUS_ZOMBIE = constant(5, "zombie")
STATUS_DEAD = constant(6, "dead")
STATUS_WAKE_KILL = constant(7, "wake kill")
STATUS_WAKING = constant(8, "waking")
STATUS_IDLE = constant(9, "idle")  # BSD
STATUS_LOCKED = constant(10, "locked")  # BSD
STATUS_WAITING = constant(11, "waiting")  # BSD

# --- Process.get_connections() 'kind' parameter mapping

import socket
from socket import AF_INET, SOCK_STREAM, SOCK_DGRAM
AF_INET6 = getattr(socket, 'AF_INET6', None)

conn_tmap = {
    "all"  : ([AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
    "tcp"  : ([AF_INET, AF_INET6], [SOCK_STREAM]),
    "tcp4" : ([AF_INET],           [SOCK_STREAM]),
    "udp"  : ([AF_INET, AF_INET6], [SOCK_DGRAM]),
    "udp4" : ([AF_INET],           [SOCK_DGRAM]),
    "inet" : ([AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
    "inet4": ([AF_INET],           [SOCK_STREAM, SOCK_DGRAM]),
    "inet6": ([AF_INET6],          [SOCK_STREAM, SOCK_DGRAM]),
}

if AF_INET6 is not None:
    conn_tmap.update({
        "tcp6" : ([AF_INET6],          [SOCK_STREAM]),
        "udp6" : ([AF_INET6],          [SOCK_DGRAM]),
    })

del AF_INET, AF_INET6, SOCK_STREAM, SOCK_DGRAM, socket

# --- namedtuples

# system
nt_sys_cputimes = namedtuple('cputimes', 'user nice system idle iowait irq softirq')
nt_sysmeminfo = namedtuple('usage', 'total used free percent')
nt_diskinfo = namedtuple('usage', 'total used free percent')
nt_partition = namedtuple('partition',  'device mountpoint fstype opts')
nt_net_iostat = namedtuple('iostat', 'bytes_sent bytes_recv packets_sent packets_recv')
nt_disk_iostat = namedtuple('iostat', 'read_count write_count read_bytes write_bytes read_time write_time')
nt_user = namedtuple('user', 'name terminal host started')

# processes
nt_meminfo = namedtuple('meminfo', 'rss vms')
nt_cputimes = namedtuple('cputimes', 'user system')
nt_openfile = namedtuple('openfile', 'path fd')
nt_connection = namedtuple('connection', 'fd family type local_address remote_address status')
nt_thread = namedtuple('thread', 'id user_time system_time')
nt_uids = namedtuple('user', 'real effective saved')
nt_gids = namedtuple('group', 'real effective saved')
nt_io = namedtuple('io', 'read_count write_count read_bytes write_bytes')
nt_ionice = namedtuple('ionice', 'ioclass value')