File: task_manager

package info (click to toggle)
python-scientific 2.8-1.2
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 6,456 kB
  • ctags: 7,038
  • sloc: python: 16,436; ansic: 4,379; makefile: 135; sh: 18; csh: 1
file content (114 lines) | stat: -rw-r--r-- 3,793 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!python

#
# Send inquiries to an active task manager
# (see Scientific.DistributedComputing)

# Written by Konrad Hinsen <hinsen@cnrs-orleans.fr>
# last revision: 2006-12-12
#

from Scientific.DistributedComputing import TaskManager
import Pyro.core
import Pyro.naming
import Pyro.errors
from optparse import OptionParser
import os, sys

Pyro.core.initClient(banner=False)

commands = {}
arg_count = {}

def getTaskManager(label):
    if options.master is None:
        try:
            return Pyro.core.getProxyForURI("PYRONAME://TaskManager.%s" % label)
        except Pyro.errors.NamingError:
            print "No name server or no task manager with label %s" % label
            raise SystemExit
    else:
        try:
            uri = "PYROLOC://%s/TaskManager.%s" % (options.master, label)
            return Pyro.core.getProxyForURI(uri)
        except Pyro.errors.URIError:
            print "No host %s" % options.master
            raise SystemExit
        except Pyro.errors.ProtocolError:
            print "No task manager with label %s" % label
            raise SystemExit

def showInfo(label):
    task_manager = getTaskManager(label)
    active_processes = task_manager.numberOfActiveProcesses()
    waiting, running, finished = task_manager.numberOfTasks()
    
    print "%d active processe(s)" % active_processes
    if options.verbose:
        for i in range(active_processes):
            print "  %d: %s" % (i, task_manager.activeProcessInfo(i))
    print "%d waiting tasks" % sum(waiting.values())
    if options.verbose:
        for tag, count in waiting.items():
            print "  %s: %d" % (tag, count)
    print "%d running tasks" % sum(running.values())
    if options.verbose:
        for tag, count in running.items():
            print "  %s: %d" % (tag, count)
    print "%d results waiting to be retrieved" % sum(finished.values())
    if options.verbose:
        for tag, count in finished.items():
            print "  %s: %d" % (tag, count)
commands["show"] = showInfo
arg_count["show"] = 2

def listTaskManagers():
    if options.master is not None:
        print "Command 'list' requires a name server"
        raise SystemExit
    pyro_ns = Pyro.naming.NameServerLocator().getNS()
    for label, type in pyro_ns.list("TaskManager"):
        if type == 1:
            print label
            if options.verbose:
                task_manager = getTaskManager(label)
                active_processes = task_manager.numberOfActiveProcesses()
                print "  %d active processe(s)" % active_processes
commands["list"] = listTaskManagers
arg_count["list"] = 1


def runSlave(label):
    task_manager = getTaskManager(label)
    try:
        slave_code = task_manager.retrieveData("slave_code")
        directory = task_manager.retrieveData("cwd")
    except KeyError:
        print "No slave code available for %s" % label
        raise SystemExit
    namespace = {}
    sys.modules["__main__"].SLAVE_PROCESS_LABEL = label
    sys.modules["__main__"].SLAVE_NAMESPACE = namespace
    os.chdir(directory)
    exec slave_code in namespace
commands["slave"] = runSlave
arg_count["slave"] = 2


# Parse command line and execute command
parser = OptionParser(usage="Usage: %prog [options] command [label]")
parser.add_option("-m", "--master",
                  help="hostname of the machine running the master process")
parser.add_option("-v", "--verbose",
                  action="store_true",
                  help="verbose output")
options, args = parser.parse_args()
if len(args) < 1:
    parser.error("incorrect number of arguments")
try:
    command = commands[args[0]]
except KeyError:
    parser.error("unknown command %s" % args[0])
if len(args) != arg_count[args[0]]:
    parser.error("incorrect number of arguments")
command(*args[1:])