File: examples-multiprocessing_multiple_runs_with_gui.txt

package info (click to toggle)
brian 1.4.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, stretch
  • size: 23,436 kB
  • sloc: python: 68,707; cpp: 29,040; ansic: 5,182; sh: 111; makefile: 61
file content (196 lines) | stat: -rw-r--r-- 8,898 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
.. currentmodule:: brian

.. index::
   pair: example usage; ion

.. _example-multiprocessing_multiple_runs_with_gui:

Example: multiple_runs_with_gui (multiprocessing)
=================================================

A complicated example of using multiprocessing for multiple runs of a simulation
with different parameters, using a GUI to monitor and control the runs.

This example features:

* An indefinite number of runs, with a set of parameters for each run generated
  at random for each run.
* A plot of the output of all the runs updated as soon as each run is completed.
* A GUI showing how long each process has been running for and how long until
  it completes, and with a button allowing you to terminate the runs.

A simpler example is in ``examples/multiprocessing/multiple_runs_simple.py``.

::

    
    # We use Tk as the backend for the GUI and matplotlib so as to avoid any
    # threading conflicts
    import matplotlib
    matplotlib.use('TkAgg')
    
    from brian import *
    import Tkinter, time, multiprocessing, os
    from brian.utils.progressreporting import make_text_report
    from Queue import Empty as QueueEmpty
    
    
    class SimulationController(Tkinter.Tk):
        '''
        GUI, uses Tkinter and features a progress bar for each process, and a callback
        function for when the terminate button is clicked.
        '''
        def __init__(self, processes, terminator, width=600):
            Tkinter.Tk.__init__(self, None)
            self.parent = None
            self.grid()
            button = Tkinter.Button(self, text='Terminate simulation',
                                    command=terminator)
            button.grid(column=0, row=0)
            self.pb_width = width
            self.progressbars = []
            for i in xrange(processes):
                can = Tkinter.Canvas(self, width=width, height=30)
                can.grid(column=0, row=1 + i)
                can.create_rectangle(0, 0, width, 30, fill='#aaaaaa')
                r = can.create_rectangle(0, 0, 0, 30, fill='#ffaaaa', width=0)
                t = can.create_text(width / 2, 15, text='')
                self.progressbars.append((can, r, t))
            self.results_text = Tkinter.Label(self, text='Computed 0 results, time taken: 0s')
            self.results_text.grid(column=0, row=processes + 1)
            self.title('Simulation control')
    
        def update_results(self, elapsed, complete):
            '''
            Method to update the total number of results computed and the amount of time taken.
            '''
            self.results_text.config(text='Computed ' + str(complete) + ', time taken: ' + str(int(elapsed)) + 's')
            self.update()
    
        def update_process(self, i, elapsed, complete, msg):
            '''
            Method to update the status of a given process.
            '''
            can, r, t = self.progressbars[i]
            can.itemconfigure(t, text='Process ' + str(i) + ': ' + make_text_report(elapsed, complete) + ': ' + msg)
            can.coords(r, 0, 0, int(self.pb_width * complete), 30)
            self.update()
    
    def sim_mainloop(pool, results, message_queue):
        '''
        Monitors results of a simulation as they arrive
        
        pool is the multiprocessing.Pool that the processes are running in,
        results is the AsyncResult object returned by Pool.imap_unordered which
        returns simulation results asynchronously as and when they are ready,
        and message_queue is a multiprocessing.Queue used to communicate between
        child processes and the server process. In this case, we use this Queue to
        send messages about the percent complete and time elapsed for each run.
        '''
        # We use this to enumerate the processes, mapping their process IDs to an int
        # in the range 0:num_processes.
        pid_to_id = dict((pid, i) for i, pid in enumerate([p.pid for p in pool._pool]))
        num_processes = len(pid_to_id)
        start = time.time()
        stoprunningsim = [False]
        # This function terminates all the pool's child processes, it is used as
        # the callback function called when the terminate button on the GUI is clicked.
        def terminate_sim():
            pool.terminate()
            stoprunningsim[0] = True
        controller = SimulationController(num_processes, terminate_sim)
        for i in range(num_processes):
            controller.update_process(i, 0, 0, 'no info yet')
        i = 0
        while True:
            try:
                # If there is a new result (the 0.1 means wait 0.1 seconds for a
                # result before giving up) then this try clause will execute, otherwise
                # a TimeoutError will occur and the except clause afterwards will
                # execute.
                weight, numspikes = results.next(0.1)
                # if we reach here, we have a result to plot, so we plot it and
                # update the GUI
                plot_result(weight, numspikes)
                i = i + 1
                controller.update_results(time.time() - start, i)
            except multiprocessing.TimeoutError:
                # if we're still waiting for a new result, we can process events in
                # the message_queue and update the GUI if there are any.
                while not message_queue.empty():
                    try:
                        # messages here are of the form: (pid, elapsed, complete)
                        # where pid is the process ID of the child process, elapsed
                        # is the amount of time elapsed, and complete is the
                        # fraction of the run completed. See function how_many_spikes
                        # to see where these messages come from.
                        pid, elapsed, complete = message_queue.get_nowait()
                        controller.update_process(pid_to_id[pid], elapsed, complete, '')
                    except QueueEmpty:
                        break
                controller.update()
                if stoprunningsim[0]:
                    print 'Terminated simulation processes'
                    break
        controller.destroy()
    
    def plot_result(weight, numspikes):
        plot([weight], [numspikes], '.', color=(0, 0, 0.5))
        axis('tight')
        draw() # this forces matplotlib to redraw
    
    # Note that how_many_spikes only takes one argument, which is a tuple of
    # its actual arguments. The reason for this is that Pool.imap_unordered can only
    # pass a single argument to the function its applied to, but that argument can
    # be a tuple...
    def how_many_spikes((excitatory_weight, message_queue)):
        reinit_default_clock()
        clear(True)
    
        eqs = '''
        dv/dt = (ge+gi-(v+49*mV))/(20*ms) : volt
        dge/dt = -ge/(5*ms) : volt
        dgi/dt = -gi/(10*ms) : volt
        '''
        P = NeuronGroup(4000, eqs, threshold= -50 * mV, reset= -60 * mV)
        P.v = -60 * mV + 10 * mV * rand(len(P))
        Pe = P.subgroup(3200)
        Pi = P.subgroup(800)
        Ce = Connection(Pe, P, 'ge')
        Ci = Connection(Pi, P, 'gi')
        Ce.connect_random(Pe, P, 0.02, weight=excitatory_weight)
        Ci.connect_random(Pi, P, 0.02, weight= -9 * mV)
        M = SpikeMonitor(P)
    
        # This reporter function is called every second, and it sends a message to
        # the server process updating the status of the current run.
        def reporter(elapsed, complete):
            message_queue.put((os.getpid(), elapsed, complete))
    
        run(4000 * ms, report=reporter, report_period=1 * second)
    
        return (excitatory_weight, M.nspikes)
    
    
    if __name__ == '__main__':
        numprocesses = None # number of processes to use, set to None to have one per CPU
        # We have to use a Queue from the Manager to send messages from client
        # processes to the server process
        manager = multiprocessing.Manager()
        message_queue = manager.Queue()
        pool = multiprocessing.Pool(processes=numprocesses)
        # This generator function repeatedly generates random sets of parameters
        # to pass to the how_many_spikes function
        def args():
            while True:
                weight = rand()*3.5 * mV
                yield (weight, message_queue)
        # imap_unordered returns an AsyncResult object which returns results as
        # and when they are ready, we pass this results object which is returned
        # immediately to the sim_mainloop function which monitors this, updates the
        # GUI and plots the results as they come in.
        results = pool.imap_unordered(how_many_spikes, args())
        ion() # this puts matplotlib into interactive mode to plot as we go
        sim_mainloop(pool, results, message_queue)