File: mitop.py

package info (click to toggle)
python-mitogen 0.3.26-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,456 kB
  • sloc: python: 22,134; sh: 183; makefile: 74; perl: 19; ansic: 18
file content (248 lines) | stat: -rw-r--r-- 7,767 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
"""
mitop.py is a version of the UNIX top command that knows how to display process
lists from multiple machines in a single listing.

This is a basic, initial version showing overall program layout. A future
version will extend it to:

    * Only notify the master of changed processes, rather than all processes.
    * Runtime-reconfigurable filters and aggregations handled on the remote
      machines rather than forcing a bottleneck in the master.

"""

import curses
import subprocess
import sys
import time

import mitogen.core
import mitogen.master
import mitogen.select
import mitogen.utils


class Host(object):
    """
    A target host from the perspective of the master process.
    """
    #: String hostname.
    name = None

    #: mitogen.parent.Context used to call functions on the host.
    context = None

    #: mitogen.core.Receiver the target delivers state updates to.
    recv = None

    def __init__(self):
        #: Mapping of pid -> Process() for each process described
        #: in the host's previous status update.
        self.procs = {}


class Process(object):
    """
    A single process running on a target host.
    """
    host = None
    user = None
    pid = None
    ppid = None
    pgid = None
    command = None
    rss = None
    pcpu = None
    rss = None


def child_main(sender, delay):
    """
    Executed on the main thread of the Python interpreter running on each
    target machine, Context.call() from the master. It simply sends the output
    of the UNIX 'ps' command at regular intervals toward a Receiver on master.

    :param mitogen.core.Sender sender:
        The Sender to use for delivering our result. This could target
        anywhere, but the sender supplied by the master simply causes results
        to be delivered to the master's associated per-host Receiver.
    """
    args = ['ps', '-axwwo', 'user,pid,ppid,pgid,%cpu,rss,command']
    while True:
        sender.send(subprocess.check_output(args).decode())
        time.sleep(delay)


def parse_output(host, s):
    prev_pids = set(host.procs)

    for line in s.splitlines()[1:]:
        bits = line.split(None, 6)
        pid = int(bits[1])
        new = pid not in prev_pids
        prev_pids.discard(pid)

        try:
            proc = host.procs[pid]
        except KeyError:
            host.procs[pid] = proc = Process()
            proc.hostname = host.name

        proc.new = new
        proc.user = bits[0]
        proc.pid = pid
        proc.ppid = int(bits[2])
        proc.pgid = int(bits[3])
        proc.pcpu = float(bits[4])
        proc.rss = int(bits[5]) / 1024
        proc.command = bits[6]

    # These PIDs had no update, so probably they are dead now.
    for pid in prev_pids:
        del host.procs[pid]


class Painter(object):
    """
    This is ncurses (screen drawing) magic, you can ignore it. :)
    """
    def __init__(self, hosts):
        self.stdscr = curses.initscr()
        curses.start_color()
        self.height, self.width = self.stdscr.getmaxyx()
        curses.cbreak()
        curses.noecho()
        self.stdscr.keypad(1)
        self.hosts = hosts
        self.format = (
            '%(hostname)10.10s '
            '%(pid)7.7s '
            '%(ppid)7.7s '
            '%(pcpu)6.6s '
            '%(rss)5.5s '
            '%(command)20s'
        )

    def close(self):
        curses.endwin()

    def paint(self):
        self.stdscr.erase()
        self.stdscr.addstr(0, 0, time.ctime())

        all_procs = []
        for host in self.hosts:
            all_procs.extend(host.procs.values())

        all_procs.sort(key=(lambda proc: -proc.pcpu))

        self.stdscr.addstr(1, 0, self.format % {
            'hostname': 'HOST',
            'pid': 'PID',
            'ppid': 'PPID',
            'pcpu': '%CPU',
            'rss': 'RSS',
            'command': 'COMMAND',
        })
        for i, proc in enumerate(all_procs):
            if (i+3) >= self.height:
                break
            if proc.new:
                self.stdscr.attron(curses.A_BOLD)
            else:
                self.stdscr.attroff(curses.A_BOLD)
            self.stdscr.addstr(2+i, 0, self.format % dict(
                vars(proc),
                command=proc.command[:self.width-36]
            ))

        self.stdscr.refresh()


def master_main(painter, router, select, delay):
    """
    Loop until CTRL+C is pressed, waiting for the next result delivered by the
    Select. Use parse_output() to turn that result ('ps' command output) into
    rich data, and finally repaint the screen if the repaint delay has passed.
    """
    next_paint = 0
    while True:
        msg = select.get()
        parse_output(msg.receiver.host, msg.unpickle())
        if next_paint < time.time():
            next_paint = time.time() + delay
            painter.paint()


@mitogen.main()
def main(router):
    """
    Main program entry point. @mitogen.main() is just a helper to handle
    reliable setup/destruction of Broker, Router and the logging package.
    """
    argv = sys.argv[1:]
    if not len(argv):
        print('mitop: Need a list of SSH hosts to connect to.')
        sys.exit(1)

    delay = 2.0
    select = mitogen.select.Select(oneshot=False)
    hosts = []

    # For each hostname on the command line, create a Host instance, a Mitogen
    # connection, a Receiver to accept messages from the host, and finally
    # start child_main() on the host to pump messages into the receiver.
    for hostname in argv:
        print('Starting on', hostname)
        host = Host()
        host.name = hostname

        if host.name == 'localhost':
            host.context = router.local()
        else:
            host.context = router.ssh(hostname=host.name)

        # A receiver wires up a handle (via Router.add_handler()) to an
        # internal thread-safe queue object, which can be drained through calls
        # to recv.get().
        host.recv = mitogen.core.Receiver(router)
        host.recv.host = host

        # But we don't want to receive data from just one receiver, we want to
        # receive data from many. In this case we can use a Select(). It knows
        # how to efficiently sleep while waiting for the first message sent to
        # many receivers.
        select.add(host.recv)

        # The inverse of a Receiver is a Sender. Unlike receivers, senders are
        # serializable, so we can call the .to_sender() helper method to create
        # one equivalent to our host's receiver, and pass it directly to the
        # host as a function parameter.
        sender = host.recv.to_sender()

        # Finally invoke the function in the remote target. Since child_main()
        # is an infinite loop, using .call() would block the parent, since
        # child_main() never returns. Instead use .call_async(), which returns
        # another Receiver. We also want to wait for results from it --
        # although child_main() never returns, if it crashes the exception will
        # be delivered instead.
        call_recv = host.context.call_async(child_main, sender, delay)
        call_recv.host = host

        # Adding call_recv to the select will cause mitogen.core.CallError to
        # be thrown by .get() if startup of any context fails, causing halt of
        # master_main(), and the exception to be printed.
        select.add(call_recv)
        hosts.append(host)

    # Painter just wraps up all the prehistory ncurses code and keeps it out of
    # master_main().
    painter = Painter(hosts)
    try:
        try:
            master_main(painter, router, select, delay)
        except KeyboardInterrupt:
            # Shut down gracefully when the user presses CTRL+C.
            pass
    finally:
        painter.close()