File: cow.py

package info (click to toggle)
python-scipy 0.6.0-12
  • links: PTS, VCS
  • area: main
  • in suites: lenny
  • size: 32,016 kB
  • ctags: 46,675
  • sloc: cpp: 124,854; ansic: 110,614; python: 108,664; fortran: 76,260; objc: 424; makefile: 384; sh: 10
file content (968 lines) | stat: -rw-r--r-- 39,329 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
""" Cow --- Cluster Of Workstations.
    Description

        cow is a package that includes tools for interacting with a cluster
        of workstations as a single computational engine.  It is well suited
        for "embarassingly parallel" problems that fit comfortably into a
        master-slave paradigm.  For such problems, it is often possible to
        convert them from parallel to serial execution by changing a few lines
        of code.  Parallel Genetic Algorithms or Monte Carlo simulations are
        two classes of problems that can benefit significantly from using
        cow.  If your problem requires closely coupled parallelism, take a
        look at pyMPI.

        cow also includes a reasonably complete set of tools administering a
        cluster of machines from Python such as process control, gathering
        system information and executing shell commands on remote processes
        simultaneously.

        cow should graze happily in heterogeneous cluster environments, though
        it hasn't tested extensively there.

        See scipy.cow.machine_cluster for more information and examples.
"""

import sync_cluster, socket
import time
import os # for getuid()

ClusterError = 'ClusterError'
TimeoutError = 'TimeoutError'

class machine_cluster:
    """ Treats a cluster of workstations as a single computational engine.

        Description

            machine_cluster simplifies interacting with a cluster of
            workstations.  machine_cluster starts a group of slave
            interpreters.  Setting and retreiving global variables on these
            machines is accomplished through its dictionary interface.
            You can also call functions or execute code blocks on the remote
            machines.  A couple of routines provide automatic parallelization
            of looping constructs.  It also provides routines for cluster
            wide process control similar to ps and cluster wide machine
            information on load, CPU, and memory information.


        Caveats

            Some functionality is only available on Linux. Unix is better
            supported in general than MSWindows.  See user manual for details.

            Cow assumes you have ssh access to all slave machines.  If not,
            the start() method will fail and you'll need to start all slaves
            by hand using::

                python sync_cluster.py server <port>

        Example Usage::

            # start two slave interpreters on the local machine at the
            # specified ports
            >>> slave_list = [ ('127.0.0.1',10000), ('127.0.0.1',10001)]
            >>> cluster = scipy.cow.machine_cluster(slave_list)
            >>> cluster.start()
            # set and retreive a global variable on the slave interpreters
            >>> cluster['a'] = 1
            >>> cluster['a']
            (1, 1)
            >>> import string
            # process a list of strings in parallel converting them to
            # upper case (illustrative example only -- dismal performance)
            >>> string_list = ['aaa','bbb','ccc','ddd']
            >>> cluster.loop_apply(string.upper,0,(string_list,))
            ['AAA','BBB','CCC','DDD']
            >>> cluster.info()
            MACHINE   CPU        GHZ   MB TOTAL  MB FREE   LOAD
            bull      2xP3       0.5     960.0     930.0   0.00
            bull      2xP3       0.5     960.0     930.0   0.00
    """
    def __init__(self,server_list):
        """ machine_cluster(slave_list) --> cluster_object

            Description

                Create a cluster object from a list of slave machines.
                slave_list is a list of 2-tuples of the form (address, port)
                that specify the network address and port where the slave
                interpreters will live and listen.  The address is always
                a string and can be either the machine name or its IP address.
                The port should be an unused port on the slave machine.
                Always use a port number higher than 1024.

            Example::

                # example 1 using IP addresses
                >>> slave_list = [ ('127.0.0.1',10000), ('127.0.0.1',10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                # example 2 using machine names
                >>> slave_list = [ ('node0',11500), ('node1',11500)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
        """
        self.workers=[]
        self.worker_by_name={}
        worker_id = 1
        for host,port in server_list:
            # Add the uid here can help with port conflicts, but only works
            # on Unix clusters.  We really need to work out a daemon service
            # model that makes the port mess transparent.
            port = port #+ os.getuid()
            new_worker = sync_cluster.standard_sync_client(host,port,worker_id)
            self.workers.append(new_worker)
            self.worker_by_name[host] = new_worker
            worker_id = worker_id + 1

    def start(self,force_restart=0,timeout=60):
        """ start(force_restart=0, timeout=60) -- Start remote slave processes.

            Description

                Start the remote slave interpreters in the cluster.  The timeout
                value is specified in seconds and defaults to 60.  The timeout
                starts counting down only after ssh/rsh has tried to start
                all the remote processes.  This means the actual time in the
                function could be much longer than 60 seconds - depending
                on how long rsh/ssh takes.

                Its possible the 60 second time out will be to short for large
                clusters - but I hope not!

            Caveats

                start() is not supported on MSWindows because of the lack of
                standard/robust support for remote startup and background
                processing in the CMD shell.
        """
        if not force_restart and self.is_running():
            return
        # start the worker processes.
        for worker in self.workers:
            worker.start_server()

        if not self.is_running():
            print '                      Starting Servers'
            print ' |----|----|----15---|----|----30---|----|----45---' \
                   '|----|----60'
            print '0.',
            stop_watch = timer()
            stop_watch.start()
            minute = 0
        import sys
        while not self.is_running():
            if stop_watch.current_lap() > 1:
                sys.stdout.write('.')
                stop_watch.mark_lap()
            elapsed = stop_watch.elapsed()
            if (elapsed - minute * 60) > 60:
                minute = minute + 1
                print
                print minute,
            if elapsed > timeout:
                raise TimeoutError
        print 'servers running!'

    def stop(self):
        """ stop() -- Tell all remote slaves to terminate.

            Description

                stop calls sys.exit(0) on all the slave processes so that
                they will terminate gracefully.  Note that if, for some
                reason, you are unable to connect to a remote processes due
                to some socket error, you'll have to kill the slave process
                by hand.
        """
        for worker in self.workers:
            import sys; sys.stdout.flush()
            try: worker.exec_code('import sys;sys.exit(0)')
            except:
                #should really do something here to
                # trap non-SystemExit errors.
                pass
    def restart(self):
        """ restart() -- terminate all remote slaves and restart them.

            restart is useful when you would like to reset all slave
            interpreters to a known state.
        """
        self.stop()
        self.start()

    def is_running(self,timeout=0):
        """ is_running(timeout=0) --> 0 or 1

            check all the slave processes in the cluster are up and running.
            if timeout is specified, is_running will continually check if the
            cluster is_running until it either gets a positive result or gives
            up and returns 0 after the specified number of seconds.
        """

        # wait for them to start
        import time
        st = time.time()
        still_waiting = 1
        while still_waiting:
            try:
                # Send a simple command to all workers
                # and wait till they handle it successfully
                self.exec_code("1==1")
            except ClusterError:
                still_waiting = 1
                elapsed = time.time() - st
                if elapsed > timeout:
                    # We've run out of time.
                    return 0
            else:
                still_waiting = 0
            wait_time = time.time() - st
        # should we somehow dessiminate worker topology (ids)
        # to all machines here?
        return 1

    def _send(self,package,addendum=None):
        """ _send(package, addendum=None) -- send a package to all slaves.

            Description

                _send takes a package packed up by a packer object (see
                sync_cluster) and sends it to each of the slave processes.
                addendum is either None or a list with the same length as
                there are slave processes.  Each entry is a small package
                of additional information that is to be sent to a specific
                slave process.  It contains data that is only needed by that
                process.

            Implementation Notes

                The send is done synchronously to each worker in turn.  The
                entire package is sent to slave0 before moving on and sending
                the message to slave1.

                If a socket error occurs while trying to send data to a
                given slave, the offending worker is pushed into the
                self.has_send_error list.  Also, self.send_exc is a dictionary
                that stores the (err_type,err_msg) as the key and the offending
                worker as the value.  This information is used in recv to
                skip receiving from slaves who failed on send and also for
                error reporting.
        """
        if addendum:
            N = len(addendum)
            assert(N <= len(self.workers))
        else:
            N = len(self.workers)
        self.send_exc = {}
        self.had_send_error = []
        for i in range(N):
            try:
                if not addendum:
                    self.workers[i].send(package)
                else:
                    self.workers[i].send(package,addendum[i])
            except socket.error, msg:
                import sys
                err_type, err_msg = str,sys.exc_info()[:2]
                self.had_send_error.append(self.workers[i])
                key = (err_type,err_msg)
                try:
                    self.send_exc[key].append(self.workers[i].id)
                except:
                    self.send_exc[key] = [self.workers[i].id]
            # else - handle other errors?
        self.Nsent = N

    def _recv(self):
        """ _recv() -- retreive results from slave processes.

            Description

                Retreive results from all slave processes that were
                successfully sent a package.  If an error occurs while
                receiving from one of the slaves, the error is noted and
                the results from the other slaves are retreived.  A tuple
                the results from all workers is returned by _recv.  An
                entry of None is placed in the tuple for any worker that
                had an error.

                The recv is done synchronously, waiting for slave0 to return
                its results before moving on to slave1 to recv its results.
        """

        self.had_recv_error = []
        self.recv_exc = {}
        results = []
        import sys;
        #only listen on workers involved in calculation.
        for worker in self.workers[:self.Nsent]:
            if worker in self.had_send_error:
                results.append(None)
            else:
                try:
                    sys.stdout.flush()
                    results.append(worker.recv())
                except sync_cluster.RemoteError:
                    import sys
                    err = sys.exc_info()[1]
                    # Force the err msg (err[1]) to be a string.
                    # This dimishes info content, but makes sure
                    # that the sames errors are hashed correctly
                    # in the dictionary. (does it?)
                    err_type,err_msg, err_traceback = err
                    err = err_type,str(err_msg), err_traceback
                    self.had_recv_error.append(worker)
                    try: self.recv_exc[err].append(worker.id)
                    except: self.recv_exc[err] = [worker.id]
                    results.append(None)
                except sync_cluster.RemoteCrashError:
                    # Gotta be more intelligent here...
                    msg =  'Error! Remote worker %d appears to have crashed.' \
                            % worker.id
                    raise sync_cluster.RemoteCrashError,msg
                # else handle other errors
        #print
        return tuple(results)

    def _send_recv(self,package,addendum=None):
        """ _send_recv(package,addendum=None) --> results

            Description

                send a message to each worker in turn and then immediately
                began listening for the results.  All sends are done before
                listening for results from any of the slave processes.  See
                _send and _recv for more information.

                If an error occurs during either the send or recv phases,
                the handle_error() method is called.  If know errors are found,
                a tuple containing the results from each slave is returned.

                If an error does occur and an exception is raised, it is still
                possible to retreive the set of results that executed correctly
                from the last_results attribute.
        """
        self._send(package,addendum)
        self.last_results = self._recv()
        if(len(self.send_exc) or len(self.recv_exc)):
            self.handle_error()
        return self.last_results

    def handle_error(self):
        """ handle_error() -- make sense of send and recv errors

            Description

                Error handling attempts to examine the errors that occuer
                during remote execution and report them in the least verbose
                manner.  If the same error occurs on all slaves, it tries to
                only report it once.  Otherwise it reports all the errors
                that occur on slaves and prints the slaves traceback.

                Currently error handling is pretty simplistic.  It'd be nice
                if socket errors were viewed as severe and the slave either
                restarted or marked as dead and its work distributed among
                the other workers.

        """
        # perhaps do some nifty stuff here to
        # mark bad workers, try to restart, etc.
        msg = ''
        Nworkers = len(self.workers)
        Nsend_errors = len(self.had_send_error)
        Nsend_error_types = len(self.send_exc.keys())
        Nrecv_errors = len(self.had_recv_error)
        Nrecv_error_types = len(self.recv_exc.keys())
        if (Nsend_errors == Nworkers and
            Nsend_error_types == 1):
            sock_err_type,err_msg = self.send_exc.keys()[0]
            if sock_err_type == 111:
                # An attempt at helpful info for a common problem.
                msg =       '\n\nConnection refused on all workers.\n'
                msg = msg + '    Perhaps restarting the cluster would help.\n'
                msg = msg + '    Use Your_Clusters_Name_Here.restart()'
            else:
                msg =       'A Socket error occured sending to all workers.\n\t'
                msg = msg + str(sock_err_type) + ': ' + str(err_msg)
        elif Nsend_errors:
            msg = '\n\nThe following errors occured when sending data:\n\t'
            for err,guilty_workers in self.send_exc.items():
                msg = msg + str(err) + '\n\t'
                msg = msg + 'Guilty workers: ' + str(guilty_workers) + '\n'

        if (Nrecv_errors == Nworkers and
              Nrecv_error_types == 1):
            err,dummy = self.recv_exc.items()[0]
            err_type, err_msg, err_traceback = err
            msg =       '\n\nThe same error occured on all workers:\n\t'
            msg = msg + str(err_type) + ': ' + str(err_msg)
            msg = msg + err_traceback
        elif Nrecv_errors:
            msg = '\n\nThe following errors occured on workers:\n\t'
            for err,guilty_workers in self.recv_exc.items():
                err_type, err_msg, err_traceback = err
                msg = msg + str(err_type) + ': ' + str(err_msg) + '\n'
                msg = msg + 'Guilty workers: ' + str(guilty_workers) + '\n'
                msg = msg + err_traceback


        raise ClusterError, msg

    ##############################################################
    # slave processor info
    ##############################################################
    def load(self):
        """ load() -- print human readable load information for slave hosts

            Description

                The load value printed is the 1 minute load average that
                is commonly printed by uptime on Unix machines.

                load depends on the implementation of numpy_proc on each
                slave's host OS. It will not work for Windows slave processes.
                However, if you are using a Windows master to control a Linux
                cluster of slaves, it should work fine.

            Example::

                >>> slave_list = [('n0',10000), ('n1', 10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                >>> cluster.start()
                >>> cluster.load()
                    n0: 0.00, n1: 0.00
        """
        import string
        import numpy.distutils.proc as numpy_proc
        results = self.load_list()
        for i in range(len(self.workers)):
            name = string.split(self.workers[i].host,'.')[0]
            res = results[i]
            s = "%6s: %1.2f," % (name[-6:], res['load_1'])
            print s,
            if not ((i+1) % 5):
                print

    def info(self):
        """ info() -- print human readable info about the slave hosts

            Description

                Print out the each slave interpreters host name, number
                and type of processors, memory usage, and current load
                information in human readable form.

                info depends on the implementation of numpy_proc on each
                slave's host OS. It will not work for Windows slave processes.
                However, if you are using a Windows master to control a Linux
                cluster of slaves, it should work fine.

            Example::

                >>> slave_list = [('n0',10000), ('n1', 10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                >>> cluster.start()
                >>> cluster.info()
                MACHINE   CPU        GHZ   MB TOTAL  MB FREE   LOAD
                n0        2xP3       0.4     192.0      13.0   0.00
                n1        2xP3       0.4     192.0      22.0   0.00
        """
        import string
        results = self.info_list()
        labels = "%-8s  %-9s  %-4s  %-8s  %-8s  %-4s" % \
                 ('MACHINE','CPU','GHZ','MB TOTAL',
                  'MB FREE','LOAD')
        print labels
        for i in range(len(self.workers)):
            name = string.split(self.workers[i].host,'.')[0]
            res = results[i]
            s = "%-8s %2dx%-6s  %4.1f  %8.1f  %8.1f   %4.2f" %  \
                (name[-8:], res['cpu_count'],res['cpu_type'][-6:], \
                 res['cpu_speed'],res['mem_total'],res['mem_free'],\
                 res['load_1'])
            print s

    def load_list(self):
        """ load_list() -- Return a list of slave load information dictionaries

            Description

                Retreive a dictionary with information about the load on each
                host processor.  The dictionaries have three keys, load1,
                load5, and load15 indicating the 1, 5, and 15 minute load
                averages for the processor.  These could be useful for (as
                yet unimplemented) load balancing schemes.

                load_list depends on the implementation of numpy_proc on each
                slave's host OS. It will not work for Windows slave processes.
                However, if you are using a Windows master to control a Linux
                cluster of slaves, it should work fine.

            Example::

                >>> slave_list = [('n0',10000), ('n1', 10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                >>> cluster.start()
                >>> cluster.load_info()
                ({'load_5': 0.0, 'load_1': 0.0, 'load_15': 0.0},
                 {'load_5': 0.0, 'load_1': 0.0, 'load_15': 0.0})
        """
        import numpy.distutils.proc as numpy_proc
        res = self.apply(numpy_proc.load_avg,())
        return res

    def info_list(self):
        """ info() -- print human readable info about the slave hosts

            Description

                Print out the each slave interpreters host name, number
                and type of processors, memory usage, and current load
                information in human readable form.

                info depends on the implementation of numpy_proc on each
                slave's host OS. It will not work for Windows slave processes.
                However, if you are using a Windows master to control a Linux
                cluster of slaves, it should work fine.

            Example::

                >>> slave_list = [('n0',10000), ('n1', 10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                >>> cluster.start()
                >>> cluster.info()
                MACHINE   CPU        GHZ   MB TOTAL  MB FREE   LOAD
                n0        2xP3       0.4     192.0      13.0   0.00
                n1        2xP3       0.4     192.0      22.0   0.00

        """
        import numpy.distutils.proc as numpy_proc
        res = self.apply(numpy_proc.machine_info,())
        return res

    ##############################################################
    # slave process information and control
    ##############################################################
    def ps(self,sort_by='cpu',**filters):
        """ ps(sort_by='cpu',**filters) -- list processes on slave machines.

            Description

                List all the processes on all remote slave machines.  This
                is like a cluster-wide Unix ps command and is output in a
                similar human readable form.  The sort_by argument allows
                you to sore the process list by various fields including,
                pid, cpu, user, machine, memory, state and command.  keyword
                arguments are used as filters to limit the number of processes
                displayed.  For example, the keyword, user='ej' will only list
                processes for user ej and cpu='>10' will only list processes
                using more th 50% of the cpu cycles.

                ps depends on the implementation of numpy_proc on each
                slave's host OS. It will not work for Windows slave processes.
                However, if you are using a Windows master to control a Linux
                cluster of slaves, it should work fine.

            Example::

                >>> slave_list = [('n0',10000), ('n1', 10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                >>> cluster.start()
                >>> cluster.ps(user='ej')
                MACHINE USER       PID  %CPU  %MEM TOTAL MB   ...
                n0     ej        22915  99.9   2.1    4.027   ...
                n0     ej        22916  99.9   2.1    4.055   ...
                n1     ej        22915  99.9   2.1    4.027   ...
                n1     ej        22916  99.9   2.1    4.055   ...
                ...

        """
        psl = self.ps_list(sort_by,**filters)
        if len(psl):
            print psl[0].labels_with_name()
        for i in psl: print i.str_with_name()

    def ps_list(self,sort_by='cpu',**filters):
        """ ps_list(self,sort_by='cpu',**filters) -- get cluster processes

            Description

                Return a list containing one numpy_proc.process objects for
                each process running on the cluster host machines.  process
                objects contain a ton of information about cpu, memory, etc.
                used by the process.

                See ps for more information.

            Example:

                >>> slave_list = [('n0',10000), ('n1', 10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                >>> cluster.start()
                >>> p = cluster.ps_list()
                >>> for i in p: print p.pid
                ...
                22890 22889 22889 22890 1071 1071 ...
        """
        import operator
        import numpy.distutils.proc as numpy_proc
        res = self.apply(numpy_proc.ps_list,())
        psl = reduce(operator.add,res)
        psl = numpy_proc.ps_sort(psl,sort_by,**filters)
        return psl

    def nice(self,increment=10):
        """ nice(increment=10) --> success_list

            increment all slave interpreter's nice value by increment.
            hmmm. this doesn't seem to work. see os.nice()
        """
        res = self.apply(os.nice,(increment,))
        return res

    def renice(self,process_list,level):
        """ renice(process_list,level) -- set nice value multiple processes

            Description

                Change the nice level of multiple remote processes.
                process_list is a list of numpy_proc.process objects.
                level is the new nice value for the listed processes.

            Caveats

                Once niced down, a process cannot be reniced back up.
                This is a Linux issue.
        """
        res = []
        pids = {}
        for process in process_list:
            if hasattr(process,'machine'):
                try:
                    worker = self.worker_by_name[process.machine]
                except KeyError:
                    worker = self.worker_by_name[process.long_machine]
                pid = process.pid
            else:
                worker = self.workers[process[0]]
                pid = process[1]
            try:
                pids[worker] = pids[worker] + ' ' + str(pid)
            except:
                pids[worker] = str(pid)
        for worker,value in pids.items():
            arg = 'renice %d -p %s' % (level,value)
            res.append(worker.apply(os.system,(arg,)))
        return res

    def kill(self,process_list,signal = 'TERM'):
        """ kill(self,process_list,signal = 'TERM') -- Signal process list.

            Description

                Send a signal to all of the numpy_proc.process objects in
                the process_list.  This is usually used to kill the processes.
                The signal may be given as a signal name or number.
        """
        res = []
        pids = {}
        for process in process_list:
            if hasattr(process,'machine'):
                try:
                    worker = self.worker_by_name[process.machine]
                except KeyError:
                    worker = self.worker_by_name[process.long_machine]
                pid = process.pid
            else:
                worker = self.workers[process[0]]
                pid = process[1]
            try:
                pids[worker] = pids[worker] + ' ' + str(pid)
            except:
                pids[worker] = str(pid)
        for worker,value in pids.items():
            arg = 'kill -s ' + signal + ' %s' % (level,value)
            res.append(worker.apply(os.system,(arg,)))
        return res

    def system(self,cmd):
        """ system(cmd) -- execute cmd on all remote machines

            A list of all the remote responses is returned.  Unlike
            os.system which returns the exit value of the cmd string,
            this function returns the text output by the command.
        """
        code = 'import os;f=os.popen("%s");res = f.read(-1);f.close();' % cmd
        return self.exec_code(code,returns=['res'])

    def reload(self,module):
        """ reload(module) -- reload module on all remote interpreters

            module can either be the name of a module or the actual
            module object.
        """
        try:
            code = 'import %s; reload(%s)' % ((module.__name__,)*2)
        except AttributeError:
            code = 'import %s; reload(%s)' % ((module,)*2)
        self.workers.exec_code(code)

    ##############################################################
    # remote code and function execution
    ##############################################################

    # mirror all of sync_client functions
    # They assumes all clients have the same packing procedures.
    def exec_code(self,code,inputs=None,returns=None):
        """ exec_code(code,inputs=None,returns=None)

            Similar to Python's exec statement. Execute the same code fragment
            on all remote interpreter. inputs is a dictionary of variable
            values to use when executing the code. returns is a list of
            variable names that should be returned after executing the code.
            If one value is specified, the value for that variable is returned.
            If multiple values are specified, a tuple is returned.

            exec_code returns a list of the values requested variables,
            one entry for each slave.
        """
        #use the first worker to package up the cmd.
        package = self.workers[0].exec_code_pack(code,inputs,returns)
        return self._send_recv(package)

    def apply(self,function,args=(),keywords=None):
        """ apply(function,args=(),keywords=None)

            Similar to Python's builtin apply method.  Execute the given
            function with the argument list, args, and keyword arguments,
            keywords, on each of the slave processes.

            apply returns a list of the results from calling function,
            one result for each slave.
        """
        package = self.workers[0].apply_pack(function,args,keywords)
        return self._send_recv(package)

    def loop_apply(self,function,loop_var,args=(),keywords=None):
        """ loop_apply(function,loop_var, args=(),keywords=None)

            Description

                Call function with the given args and keywords.  One of the
                arguments or keywords is actually a sequence of arguments.
                This sequence is looped over, calling function once for each
                value in the sequence. loop_var indicates which variable to
                loop over.  If an integer, loop_var indexes the args list.
                If a string, it specifies a keyword variable.  The loop sequence
                is divided as evenly as possible between the worker nodes and
                executed in parallel.

            Example::

                >>> slave_list = [('n0',10000), ('n1', 10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                >>> cluster.start()
                >>> import string
                >>> string_list = ['aaa','bbb','ccc','ddd']
                >>> cluster.loop_apply(string.upper,0,(string_list,))
                ['AAA','BBB','CCC','DDD']

        """
        #----------------------------------------------------
        # Prepare main package for sending
        # almost verbatim from loop_apply_pack in sync_cluster
        #----------------------------------------------------
        #if type(loop_var) == type(1):
        #    loop_var = function.func_code.co_varnames[loop_var]
        all_keywords = {}
        if keywords: all_keywords.update(keywords)
        #more_keywords = sync_cluster.args_to_keywords(function,args)
        #sync_cluster.catch_keyword_conflicts(more_keywords,all_keywords)
        #all_keywords.update(more_keywords)

        # pull out the loop variable.
        if type(loop_var) != type(''):
            loop_var = int(loop_var)
            loop_data = args[loop_var]
            # no need to pack and send since it'll be in the "addendum"
            args = list(args)
            args[loop_var] = None
            args = tuple(args)
        else:
            loop_data = all_keywords[loop_var]
            # no need to pack and send since it'll be in the "addendum"
            del all_keywords[loop_var]
        contents={'_command':sync_cluster.loop_func,'function':function,
                  'args':args,'keywords':all_keywords,'loop_var':loop_var}
        package = self.workers[0].packer.pack(contents)
        return self.loop_send_recv(package,loop_data,loop_var)

    def loop_code(self,code,loop_var,inputs=None,returns=None):
        """ loop_code(code,loop_var,inputs=None,returns=None)

            Description

                Similar to exec_code and loop_apply.  Here loop_var indicates
                the variable name in the inputs dictionary that is looped over.

            Example::

                >>> slave_list = [('n0',10000), ('n1', 10001)]
                >>> cluster = scipy.cow.machine_cluster(slave_list)
                >>> cluster.start()
                >>> import string
                >>> a = [1, 2, 3, 4]
                >>> cluster.loop_code("b=a*2",'a',{'a':a},('b',))
                (2, 4, 6, 8)
        """
        the_inputs = {}
        the_inputs.update(inputs)
        loop_data = the_inputs[loop_var]
        the_inputs[loop_var] = None #make it small for packing
        package = self.workers[0].loop_code_pack(code,loop_var,
                                                 the_inputs,returns)
        return self.loop_send_recv(package,loop_data,loop_var)

    # array specific routines
    def row_split(self,name,sequence):
        """experimental"""
        import scipy
        q=numpy.split(sequence,len(self.workers))
        self.loop_code(name+'=_q_','_q_',inputs={'_q_':q},returns=(),
                        global_vars=(name,))
    def row_gather(self,name):
        """experimental"""
        from Numeric import concatenate
        return concatenate(self[name])

    def loop_send_recv(self,package,loop_data,loop_var):
        #----------------------------------------------------
        # Now split the loop data evenly among the workers,
        # pack them up as addendums to the original package,
        # and send them off for processing.
        #----------------------------------------------------
        job_groups = equal_balance(loop_data,len(self.workers))
        addendums = []
        for grp in job_groups:
            addendums.append({loop_var:grp})
        results = self._send_recv(package,addendums)
        # Nothing done here to figure out the output format.
        # It is always returned as a tuple
        # Probably will be handier to have it as a
        # Numeric array sometimes.
        out = ()
        for result_group in results:
            out = out + result_group
        return out

    ##############################################################
    # dictionary interface
    ##############################################################

    def __getitem__(self, key):
        # currently allowing tuples also!
        #assert(type(key) is type(''))
        package = self.workers[0].get_pack(key)
        return self._send_recv(package)

    def __setitem__(self, key, item):
        assert(type(key) is type(''))
        package = self.workers[0].update_pack({key:item})
        return self._send_recv(package)

    def __delitem__(self, key):
        # currently allowing tuples also!
        # assert(type(key) is type(''))
        package = self.workers[0].del_pack(key)
        return self._send_recv(package)

    def update(self, dict):
        package = self.workers[0].update_pack(dict)
        return self._send_recv(package)

def equal_balance(jobs,Nworkers):
    """ Split jobs into Nworkers equal groups.
        When an equal split is not possible,
        the larger groups occur at the front
        of the list.
    """

    #no jobs to do - return empty group list.
    if not len(jobs): return ()
    Ntotal_jobs = len(jobs)

    # find the number of jobs each wroker must do
    # for everyone to have equal work loads
    group_size = Ntotal_jobs / Nworkers

    # if there are jobs left over, some of the workers
    # will need to do 1 extra job.
    if Ntotal_jobs % Nworkers:
        group_size = group_size + 1

    # after some algebra, we can solve for the
    # number, a, of workers that need to do extra work
    a = Ntotal_jobs + Nworkers - Nworkers*group_size
    if a*group_size < Ntotal_jobs:
        b = Nworkers - a
    else:
        b = 0

    # a workers do an extra job, b workers do standard
    # number of jobs.
    group_sizes = a*[group_size] + b*[group_size-1]

    # now split the jobs up into groups for each of
    # the workers.
    last = 0
    job_groups = []
    for size in group_sizes:
        next = last+size
        job_groups.append(jobs[last:next])
        last = next
#    sum = 0
#    for grp in job_groups:
#        sum = sum + len(grp)
#    assert(sum,Ntotal_jobs)
    return tuple(job_groups)

import operator
class timer:

    def __init__(self):
        self.reset()
    def reset(self):
        self.total_t = 0
        self.lap_start = 0
        self.running = 0
        self.lap_list = [0]
    def start(self):
        if not self.running:
            self.running = 1
            self.start_t = time.time()
        else:
            print 'already running: use reset() to start the timer back at 0.'

    def stop(self):
        self.running = 0
        elapsed_t = time.time() - self.start_t
        self.total_t = self.total_t + elapsed_t
        self.lap_list[-1] = self.lap_list[-1] + elapsed_t
        return self.total_t
    def elapsed(self):
        if self.running:
            return self.total_t + (time.time() - self.start_t)
        else:
            return self.total_t
    def mark_lap(self):
        self.lap_start = self.elapsed()
        self.lap_list.append(self.lap_start-self.lap_list[-1])
    def get_laps(self):
        return self.lap_list[1:] +[self.elapsed()-self.lap_list[-1]]
    def current_lap(self):
        return self.elapsed()-self.lap_start



if __name__ == '__main__':
    borg = cluster(server_list)
    borg.start()