1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968
|
""" Cow --- Cluster Of Workstations.
Description
cow is a package that includes tools for interacting with a cluster
of workstations as a single computational engine. It is well suited
for "embarassingly parallel" problems that fit comfortably into a
master-slave paradigm. For such problems, it is often possible to
convert them from parallel to serial execution by changing a few lines
of code. Parallel Genetic Algorithms or Monte Carlo simulations are
two classes of problems that can benefit significantly from using
cow. If your problem requires closely coupled parallelism, take a
look at pyMPI.
cow also includes a reasonably complete set of tools administering a
cluster of machines from Python such as process control, gathering
system information and executing shell commands on remote processes
simultaneously.
cow should graze happily in heterogeneous cluster environments, though
it hasn't tested extensively there.
See scipy.cow.machine_cluster for more information and examples.
"""
import sync_cluster, socket
import time
import os # for getuid()
ClusterError = 'ClusterError'
TimeoutError = 'TimeoutError'
class machine_cluster:
""" Treats a cluster of workstations as a single computational engine.
Description
machine_cluster simplifies interacting with a cluster of
workstations. machine_cluster starts a group of slave
interpreters. Setting and retreiving global variables on these
machines is accomplished through its dictionary interface.
You can also call functions or execute code blocks on the remote
machines. A couple of routines provide automatic parallelization
of looping constructs. It also provides routines for cluster
wide process control similar to ps and cluster wide machine
information on load, CPU, and memory information.
Caveats
Some functionality is only available on Linux. Unix is better
supported in general than MSWindows. See user manual for details.
Cow assumes you have ssh access to all slave machines. If not,
the start() method will fail and you'll need to start all slaves
by hand using::
python sync_cluster.py server <port>
Example Usage::
# start two slave interpreters on the local machine at the
# specified ports
>>> slave_list = [ ('127.0.0.1',10000), ('127.0.0.1',10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
# set and retreive a global variable on the slave interpreters
>>> cluster['a'] = 1
>>> cluster['a']
(1, 1)
>>> import string
# process a list of strings in parallel converting them to
# upper case (illustrative example only -- dismal performance)
>>> string_list = ['aaa','bbb','ccc','ddd']
>>> cluster.loop_apply(string.upper,0,(string_list,))
['AAA','BBB','CCC','DDD']
>>> cluster.info()
MACHINE CPU GHZ MB TOTAL MB FREE LOAD
bull 2xP3 0.5 960.0 930.0 0.00
bull 2xP3 0.5 960.0 930.0 0.00
"""
def __init__(self,server_list):
""" machine_cluster(slave_list) --> cluster_object
Description
Create a cluster object from a list of slave machines.
slave_list is a list of 2-tuples of the form (address, port)
that specify the network address and port where the slave
interpreters will live and listen. The address is always
a string and can be either the machine name or its IP address.
The port should be an unused port on the slave machine.
Always use a port number higher than 1024.
Example::
# example 1 using IP addresses
>>> slave_list = [ ('127.0.0.1',10000), ('127.0.0.1',10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
# example 2 using machine names
>>> slave_list = [ ('node0',11500), ('node1',11500)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
"""
self.workers=[]
self.worker_by_name={}
worker_id = 1
for host,port in server_list:
# Add the uid here can help with port conflicts, but only works
# on Unix clusters. We really need to work out a daemon service
# model that makes the port mess transparent.
port = port #+ os.getuid()
new_worker = sync_cluster.standard_sync_client(host,port,worker_id)
self.workers.append(new_worker)
self.worker_by_name[host] = new_worker
worker_id = worker_id + 1
def start(self,force_restart=0,timeout=60):
""" start(force_restart=0, timeout=60) -- Start remote slave processes.
Description
Start the remote slave interpreters in the cluster. The timeout
value is specified in seconds and defaults to 60. The timeout
starts counting down only after ssh/rsh has tried to start
all the remote processes. This means the actual time in the
function could be much longer than 60 seconds - depending
on how long rsh/ssh takes.
Its possible the 60 second time out will be to short for large
clusters - but I hope not!
Caveats
start() is not supported on MSWindows because of the lack of
standard/robust support for remote startup and background
processing in the CMD shell.
"""
if not force_restart and self.is_running():
return
# start the worker processes.
for worker in self.workers:
worker.start_server()
if not self.is_running():
print ' Starting Servers'
print ' |----|----|----15---|----|----30---|----|----45---' \
'|----|----60'
print '0.',
stop_watch = timer()
stop_watch.start()
minute = 0
import sys
while not self.is_running():
if stop_watch.current_lap() > 1:
sys.stdout.write('.')
stop_watch.mark_lap()
elapsed = stop_watch.elapsed()
if (elapsed - minute * 60) > 60:
minute = minute + 1
print
print minute,
if elapsed > timeout:
raise TimeoutError
print 'servers running!'
def stop(self):
""" stop() -- Tell all remote slaves to terminate.
Description
stop calls sys.exit(0) on all the slave processes so that
they will terminate gracefully. Note that if, for some
reason, you are unable to connect to a remote processes due
to some socket error, you'll have to kill the slave process
by hand.
"""
for worker in self.workers:
import sys; sys.stdout.flush()
try: worker.exec_code('import sys;sys.exit(0)')
except:
#should really do something here to
# trap non-SystemExit errors.
pass
def restart(self):
""" restart() -- terminate all remote slaves and restart them.
restart is useful when you would like to reset all slave
interpreters to a known state.
"""
self.stop()
self.start()
def is_running(self,timeout=0):
""" is_running(timeout=0) --> 0 or 1
check all the slave processes in the cluster are up and running.
if timeout is specified, is_running will continually check if the
cluster is_running until it either gets a positive result or gives
up and returns 0 after the specified number of seconds.
"""
# wait for them to start
import time
st = time.time()
still_waiting = 1
while still_waiting:
try:
# Send a simple command to all workers
# and wait till they handle it successfully
self.exec_code("1==1")
except ClusterError:
still_waiting = 1
elapsed = time.time() - st
if elapsed > timeout:
# We've run out of time.
return 0
else:
still_waiting = 0
wait_time = time.time() - st
# should we somehow dessiminate worker topology (ids)
# to all machines here?
return 1
def _send(self,package,addendum=None):
""" _send(package, addendum=None) -- send a package to all slaves.
Description
_send takes a package packed up by a packer object (see
sync_cluster) and sends it to each of the slave processes.
addendum is either None or a list with the same length as
there are slave processes. Each entry is a small package
of additional information that is to be sent to a specific
slave process. It contains data that is only needed by that
process.
Implementation Notes
The send is done synchronously to each worker in turn. The
entire package is sent to slave0 before moving on and sending
the message to slave1.
If a socket error occurs while trying to send data to a
given slave, the offending worker is pushed into the
self.has_send_error list. Also, self.send_exc is a dictionary
that stores the (err_type,err_msg) as the key and the offending
worker as the value. This information is used in recv to
skip receiving from slaves who failed on send and also for
error reporting.
"""
if addendum:
N = len(addendum)
assert(N <= len(self.workers))
else:
N = len(self.workers)
self.send_exc = {}
self.had_send_error = []
for i in range(N):
try:
if not addendum:
self.workers[i].send(package)
else:
self.workers[i].send(package,addendum[i])
except socket.error, msg:
import sys
err_type, err_msg = str,sys.exc_info()[:2]
self.had_send_error.append(self.workers[i])
key = (err_type,err_msg)
try:
self.send_exc[key].append(self.workers[i].id)
except:
self.send_exc[key] = [self.workers[i].id]
# else - handle other errors?
self.Nsent = N
def _recv(self):
""" _recv() -- retreive results from slave processes.
Description
Retreive results from all slave processes that were
successfully sent a package. If an error occurs while
receiving from one of the slaves, the error is noted and
the results from the other slaves are retreived. A tuple
the results from all workers is returned by _recv. An
entry of None is placed in the tuple for any worker that
had an error.
The recv is done synchronously, waiting for slave0 to return
its results before moving on to slave1 to recv its results.
"""
self.had_recv_error = []
self.recv_exc = {}
results = []
import sys;
#only listen on workers involved in calculation.
for worker in self.workers[:self.Nsent]:
if worker in self.had_send_error:
results.append(None)
else:
try:
sys.stdout.flush()
results.append(worker.recv())
except sync_cluster.RemoteError:
import sys
err = sys.exc_info()[1]
# Force the err msg (err[1]) to be a string.
# This dimishes info content, but makes sure
# that the sames errors are hashed correctly
# in the dictionary. (does it?)
err_type,err_msg, err_traceback = err
err = err_type,str(err_msg), err_traceback
self.had_recv_error.append(worker)
try: self.recv_exc[err].append(worker.id)
except: self.recv_exc[err] = [worker.id]
results.append(None)
except sync_cluster.RemoteCrashError:
# Gotta be more intelligent here...
msg = 'Error! Remote worker %d appears to have crashed.' \
% worker.id
raise sync_cluster.RemoteCrashError,msg
# else handle other errors
#print
return tuple(results)
def _send_recv(self,package,addendum=None):
""" _send_recv(package,addendum=None) --> results
Description
send a message to each worker in turn and then immediately
began listening for the results. All sends are done before
listening for results from any of the slave processes. See
_send and _recv for more information.
If an error occurs during either the send or recv phases,
the handle_error() method is called. If know errors are found,
a tuple containing the results from each slave is returned.
If an error does occur and an exception is raised, it is still
possible to retreive the set of results that executed correctly
from the last_results attribute.
"""
self._send(package,addendum)
self.last_results = self._recv()
if(len(self.send_exc) or len(self.recv_exc)):
self.handle_error()
return self.last_results
def handle_error(self):
""" handle_error() -- make sense of send and recv errors
Description
Error handling attempts to examine the errors that occuer
during remote execution and report them in the least verbose
manner. If the same error occurs on all slaves, it tries to
only report it once. Otherwise it reports all the errors
that occur on slaves and prints the slaves traceback.
Currently error handling is pretty simplistic. It'd be nice
if socket errors were viewed as severe and the slave either
restarted or marked as dead and its work distributed among
the other workers.
"""
# perhaps do some nifty stuff here to
# mark bad workers, try to restart, etc.
msg = ''
Nworkers = len(self.workers)
Nsend_errors = len(self.had_send_error)
Nsend_error_types = len(self.send_exc.keys())
Nrecv_errors = len(self.had_recv_error)
Nrecv_error_types = len(self.recv_exc.keys())
if (Nsend_errors == Nworkers and
Nsend_error_types == 1):
sock_err_type,err_msg = self.send_exc.keys()[0]
if sock_err_type == 111:
# An attempt at helpful info for a common problem.
msg = '\n\nConnection refused on all workers.\n'
msg = msg + ' Perhaps restarting the cluster would help.\n'
msg = msg + ' Use Your_Clusters_Name_Here.restart()'
else:
msg = 'A Socket error occured sending to all workers.\n\t'
msg = msg + str(sock_err_type) + ': ' + str(err_msg)
elif Nsend_errors:
msg = '\n\nThe following errors occured when sending data:\n\t'
for err,guilty_workers in self.send_exc.items():
msg = msg + str(err) + '\n\t'
msg = msg + 'Guilty workers: ' + str(guilty_workers) + '\n'
if (Nrecv_errors == Nworkers and
Nrecv_error_types == 1):
err,dummy = self.recv_exc.items()[0]
err_type, err_msg, err_traceback = err
msg = '\n\nThe same error occured on all workers:\n\t'
msg = msg + str(err_type) + ': ' + str(err_msg)
msg = msg + err_traceback
elif Nrecv_errors:
msg = '\n\nThe following errors occured on workers:\n\t'
for err,guilty_workers in self.recv_exc.items():
err_type, err_msg, err_traceback = err
msg = msg + str(err_type) + ': ' + str(err_msg) + '\n'
msg = msg + 'Guilty workers: ' + str(guilty_workers) + '\n'
msg = msg + err_traceback
raise ClusterError, msg
##############################################################
# slave processor info
##############################################################
def load(self):
""" load() -- print human readable load information for slave hosts
Description
The load value printed is the 1 minute load average that
is commonly printed by uptime on Unix machines.
load depends on the implementation of numpy_proc on each
slave's host OS. It will not work for Windows slave processes.
However, if you are using a Windows master to control a Linux
cluster of slaves, it should work fine.
Example::
>>> slave_list = [('n0',10000), ('n1', 10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
>>> cluster.load()
n0: 0.00, n1: 0.00
"""
import string
import numpy.distutils.proc as numpy_proc
results = self.load_list()
for i in range(len(self.workers)):
name = string.split(self.workers[i].host,'.')[0]
res = results[i]
s = "%6s: %1.2f," % (name[-6:], res['load_1'])
print s,
if not ((i+1) % 5):
print
def info(self):
""" info() -- print human readable info about the slave hosts
Description
Print out the each slave interpreters host name, number
and type of processors, memory usage, and current load
information in human readable form.
info depends on the implementation of numpy_proc on each
slave's host OS. It will not work for Windows slave processes.
However, if you are using a Windows master to control a Linux
cluster of slaves, it should work fine.
Example::
>>> slave_list = [('n0',10000), ('n1', 10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
>>> cluster.info()
MACHINE CPU GHZ MB TOTAL MB FREE LOAD
n0 2xP3 0.4 192.0 13.0 0.00
n1 2xP3 0.4 192.0 22.0 0.00
"""
import string
results = self.info_list()
labels = "%-8s %-9s %-4s %-8s %-8s %-4s" % \
('MACHINE','CPU','GHZ','MB TOTAL',
'MB FREE','LOAD')
print labels
for i in range(len(self.workers)):
name = string.split(self.workers[i].host,'.')[0]
res = results[i]
s = "%-8s %2dx%-6s %4.1f %8.1f %8.1f %4.2f" % \
(name[-8:], res['cpu_count'],res['cpu_type'][-6:], \
res['cpu_speed'],res['mem_total'],res['mem_free'],\
res['load_1'])
print s
def load_list(self):
""" load_list() -- Return a list of slave load information dictionaries
Description
Retreive a dictionary with information about the load on each
host processor. The dictionaries have three keys, load1,
load5, and load15 indicating the 1, 5, and 15 minute load
averages for the processor. These could be useful for (as
yet unimplemented) load balancing schemes.
load_list depends on the implementation of numpy_proc on each
slave's host OS. It will not work for Windows slave processes.
However, if you are using a Windows master to control a Linux
cluster of slaves, it should work fine.
Example::
>>> slave_list = [('n0',10000), ('n1', 10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
>>> cluster.load_info()
({'load_5': 0.0, 'load_1': 0.0, 'load_15': 0.0},
{'load_5': 0.0, 'load_1': 0.0, 'load_15': 0.0})
"""
import numpy.distutils.proc as numpy_proc
res = self.apply(numpy_proc.load_avg,())
return res
def info_list(self):
""" info() -- print human readable info about the slave hosts
Description
Print out the each slave interpreters host name, number
and type of processors, memory usage, and current load
information in human readable form.
info depends on the implementation of numpy_proc on each
slave's host OS. It will not work for Windows slave processes.
However, if you are using a Windows master to control a Linux
cluster of slaves, it should work fine.
Example::
>>> slave_list = [('n0',10000), ('n1', 10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
>>> cluster.info()
MACHINE CPU GHZ MB TOTAL MB FREE LOAD
n0 2xP3 0.4 192.0 13.0 0.00
n1 2xP3 0.4 192.0 22.0 0.00
"""
import numpy.distutils.proc as numpy_proc
res = self.apply(numpy_proc.machine_info,())
return res
##############################################################
# slave process information and control
##############################################################
def ps(self,sort_by='cpu',**filters):
""" ps(sort_by='cpu',**filters) -- list processes on slave machines.
Description
List all the processes on all remote slave machines. This
is like a cluster-wide Unix ps command and is output in a
similar human readable form. The sort_by argument allows
you to sore the process list by various fields including,
pid, cpu, user, machine, memory, state and command. keyword
arguments are used as filters to limit the number of processes
displayed. For example, the keyword, user='ej' will only list
processes for user ej and cpu='>10' will only list processes
using more th 50% of the cpu cycles.
ps depends on the implementation of numpy_proc on each
slave's host OS. It will not work for Windows slave processes.
However, if you are using a Windows master to control a Linux
cluster of slaves, it should work fine.
Example::
>>> slave_list = [('n0',10000), ('n1', 10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
>>> cluster.ps(user='ej')
MACHINE USER PID %CPU %MEM TOTAL MB ...
n0 ej 22915 99.9 2.1 4.027 ...
n0 ej 22916 99.9 2.1 4.055 ...
n1 ej 22915 99.9 2.1 4.027 ...
n1 ej 22916 99.9 2.1 4.055 ...
...
"""
psl = self.ps_list(sort_by,**filters)
if len(psl):
print psl[0].labels_with_name()
for i in psl: print i.str_with_name()
def ps_list(self,sort_by='cpu',**filters):
""" ps_list(self,sort_by='cpu',**filters) -- get cluster processes
Description
Return a list containing one numpy_proc.process objects for
each process running on the cluster host machines. process
objects contain a ton of information about cpu, memory, etc.
used by the process.
See ps for more information.
Example:
>>> slave_list = [('n0',10000), ('n1', 10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
>>> p = cluster.ps_list()
>>> for i in p: print p.pid
...
22890 22889 22889 22890 1071 1071 ...
"""
import operator
import numpy.distutils.proc as numpy_proc
res = self.apply(numpy_proc.ps_list,())
psl = reduce(operator.add,res)
psl = numpy_proc.ps_sort(psl,sort_by,**filters)
return psl
def nice(self,increment=10):
""" nice(increment=10) --> success_list
increment all slave interpreter's nice value by increment.
hmmm. this doesn't seem to work. see os.nice()
"""
res = self.apply(os.nice,(increment,))
return res
def renice(self,process_list,level):
""" renice(process_list,level) -- set nice value multiple processes
Description
Change the nice level of multiple remote processes.
process_list is a list of numpy_proc.process objects.
level is the new nice value for the listed processes.
Caveats
Once niced down, a process cannot be reniced back up.
This is a Linux issue.
"""
res = []
pids = {}
for process in process_list:
if hasattr(process,'machine'):
try:
worker = self.worker_by_name[process.machine]
except KeyError:
worker = self.worker_by_name[process.long_machine]
pid = process.pid
else:
worker = self.workers[process[0]]
pid = process[1]
try:
pids[worker] = pids[worker] + ' ' + str(pid)
except:
pids[worker] = str(pid)
for worker,value in pids.items():
arg = 'renice %d -p %s' % (level,value)
res.append(worker.apply(os.system,(arg,)))
return res
def kill(self,process_list,signal = 'TERM'):
""" kill(self,process_list,signal = 'TERM') -- Signal process list.
Description
Send a signal to all of the numpy_proc.process objects in
the process_list. This is usually used to kill the processes.
The signal may be given as a signal name or number.
"""
res = []
pids = {}
for process in process_list:
if hasattr(process,'machine'):
try:
worker = self.worker_by_name[process.machine]
except KeyError:
worker = self.worker_by_name[process.long_machine]
pid = process.pid
else:
worker = self.workers[process[0]]
pid = process[1]
try:
pids[worker] = pids[worker] + ' ' + str(pid)
except:
pids[worker] = str(pid)
for worker,value in pids.items():
arg = 'kill -s ' + signal + ' %s' % (level,value)
res.append(worker.apply(os.system,(arg,)))
return res
def system(self,cmd):
""" system(cmd) -- execute cmd on all remote machines
A list of all the remote responses is returned. Unlike
os.system which returns the exit value of the cmd string,
this function returns the text output by the command.
"""
code = 'import os;f=os.popen("%s");res = f.read(-1);f.close();' % cmd
return self.exec_code(code,returns=['res'])
def reload(self,module):
""" reload(module) -- reload module on all remote interpreters
module can either be the name of a module or the actual
module object.
"""
try:
code = 'import %s; reload(%s)' % ((module.__name__,)*2)
except AttributeError:
code = 'import %s; reload(%s)' % ((module,)*2)
self.workers.exec_code(code)
##############################################################
# remote code and function execution
##############################################################
# mirror all of sync_client functions
# They assumes all clients have the same packing procedures.
def exec_code(self,code,inputs=None,returns=None):
""" exec_code(code,inputs=None,returns=None)
Similar to Python's exec statement. Execute the same code fragment
on all remote interpreter. inputs is a dictionary of variable
values to use when executing the code. returns is a list of
variable names that should be returned after executing the code.
If one value is specified, the value for that variable is returned.
If multiple values are specified, a tuple is returned.
exec_code returns a list of the values requested variables,
one entry for each slave.
"""
#use the first worker to package up the cmd.
package = self.workers[0].exec_code_pack(code,inputs,returns)
return self._send_recv(package)
def apply(self,function,args=(),keywords=None):
""" apply(function,args=(),keywords=None)
Similar to Python's builtin apply method. Execute the given
function with the argument list, args, and keyword arguments,
keywords, on each of the slave processes.
apply returns a list of the results from calling function,
one result for each slave.
"""
package = self.workers[0].apply_pack(function,args,keywords)
return self._send_recv(package)
def loop_apply(self,function,loop_var,args=(),keywords=None):
""" loop_apply(function,loop_var, args=(),keywords=None)
Description
Call function with the given args and keywords. One of the
arguments or keywords is actually a sequence of arguments.
This sequence is looped over, calling function once for each
value in the sequence. loop_var indicates which variable to
loop over. If an integer, loop_var indexes the args list.
If a string, it specifies a keyword variable. The loop sequence
is divided as evenly as possible between the worker nodes and
executed in parallel.
Example::
>>> slave_list = [('n0',10000), ('n1', 10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
>>> import string
>>> string_list = ['aaa','bbb','ccc','ddd']
>>> cluster.loop_apply(string.upper,0,(string_list,))
['AAA','BBB','CCC','DDD']
"""
#----------------------------------------------------
# Prepare main package for sending
# almost verbatim from loop_apply_pack in sync_cluster
#----------------------------------------------------
#if type(loop_var) == type(1):
# loop_var = function.func_code.co_varnames[loop_var]
all_keywords = {}
if keywords: all_keywords.update(keywords)
#more_keywords = sync_cluster.args_to_keywords(function,args)
#sync_cluster.catch_keyword_conflicts(more_keywords,all_keywords)
#all_keywords.update(more_keywords)
# pull out the loop variable.
if type(loop_var) != type(''):
loop_var = int(loop_var)
loop_data = args[loop_var]
# no need to pack and send since it'll be in the "addendum"
args = list(args)
args[loop_var] = None
args = tuple(args)
else:
loop_data = all_keywords[loop_var]
# no need to pack and send since it'll be in the "addendum"
del all_keywords[loop_var]
contents={'_command':sync_cluster.loop_func,'function':function,
'args':args,'keywords':all_keywords,'loop_var':loop_var}
package = self.workers[0].packer.pack(contents)
return self.loop_send_recv(package,loop_data,loop_var)
def loop_code(self,code,loop_var,inputs=None,returns=None):
""" loop_code(code,loop_var,inputs=None,returns=None)
Description
Similar to exec_code and loop_apply. Here loop_var indicates
the variable name in the inputs dictionary that is looped over.
Example::
>>> slave_list = [('n0',10000), ('n1', 10001)]
>>> cluster = scipy.cow.machine_cluster(slave_list)
>>> cluster.start()
>>> import string
>>> a = [1, 2, 3, 4]
>>> cluster.loop_code("b=a*2",'a',{'a':a},('b',))
(2, 4, 6, 8)
"""
the_inputs = {}
the_inputs.update(inputs)
loop_data = the_inputs[loop_var]
the_inputs[loop_var] = None #make it small for packing
package = self.workers[0].loop_code_pack(code,loop_var,
the_inputs,returns)
return self.loop_send_recv(package,loop_data,loop_var)
# array specific routines
def row_split(self,name,sequence):
"""experimental"""
import scipy
q=numpy.split(sequence,len(self.workers))
self.loop_code(name+'=_q_','_q_',inputs={'_q_':q},returns=(),
global_vars=(name,))
def row_gather(self,name):
"""experimental"""
from Numeric import concatenate
return concatenate(self[name])
def loop_send_recv(self,package,loop_data,loop_var):
#----------------------------------------------------
# Now split the loop data evenly among the workers,
# pack them up as addendums to the original package,
# and send them off for processing.
#----------------------------------------------------
job_groups = equal_balance(loop_data,len(self.workers))
addendums = []
for grp in job_groups:
addendums.append({loop_var:grp})
results = self._send_recv(package,addendums)
# Nothing done here to figure out the output format.
# It is always returned as a tuple
# Probably will be handier to have it as a
# Numeric array sometimes.
out = ()
for result_group in results:
out = out + result_group
return out
##############################################################
# dictionary interface
##############################################################
def __getitem__(self, key):
# currently allowing tuples also!
#assert(type(key) is type(''))
package = self.workers[0].get_pack(key)
return self._send_recv(package)
def __setitem__(self, key, item):
assert(type(key) is type(''))
package = self.workers[0].update_pack({key:item})
return self._send_recv(package)
def __delitem__(self, key):
# currently allowing tuples also!
# assert(type(key) is type(''))
package = self.workers[0].del_pack(key)
return self._send_recv(package)
def update(self, dict):
package = self.workers[0].update_pack(dict)
return self._send_recv(package)
def equal_balance(jobs,Nworkers):
""" Split jobs into Nworkers equal groups.
When an equal split is not possible,
the larger groups occur at the front
of the list.
"""
#no jobs to do - return empty group list.
if not len(jobs): return ()
Ntotal_jobs = len(jobs)
# find the number of jobs each wroker must do
# for everyone to have equal work loads
group_size = Ntotal_jobs / Nworkers
# if there are jobs left over, some of the workers
# will need to do 1 extra job.
if Ntotal_jobs % Nworkers:
group_size = group_size + 1
# after some algebra, we can solve for the
# number, a, of workers that need to do extra work
a = Ntotal_jobs + Nworkers - Nworkers*group_size
if a*group_size < Ntotal_jobs:
b = Nworkers - a
else:
b = 0
# a workers do an extra job, b workers do standard
# number of jobs.
group_sizes = a*[group_size] + b*[group_size-1]
# now split the jobs up into groups for each of
# the workers.
last = 0
job_groups = []
for size in group_sizes:
next = last+size
job_groups.append(jobs[last:next])
last = next
# sum = 0
# for grp in job_groups:
# sum = sum + len(grp)
# assert(sum,Ntotal_jobs)
return tuple(job_groups)
import operator
class timer:
def __init__(self):
self.reset()
def reset(self):
self.total_t = 0
self.lap_start = 0
self.running = 0
self.lap_list = [0]
def start(self):
if not self.running:
self.running = 1
self.start_t = time.time()
else:
print 'already running: use reset() to start the timer back at 0.'
def stop(self):
self.running = 0
elapsed_t = time.time() - self.start_t
self.total_t = self.total_t + elapsed_t
self.lap_list[-1] = self.lap_list[-1] + elapsed_t
return self.total_t
def elapsed(self):
if self.running:
return self.total_t + (time.time() - self.start_t)
else:
return self.total_t
def mark_lap(self):
self.lap_start = self.elapsed()
self.lap_list.append(self.lap_start-self.lap_list[-1])
def get_laps(self):
return self.lap_list[1:] +[self.elapsed()-self.lap_list[-1]]
def current_lap(self):
return self.elapsed()-self.lap_start
if __name__ == '__main__':
borg = cluster(server_list)
borg.start()
|