File: tutorial.html

package info (click to toggle)
pprocess 0.3.1-1
  • links: PTS, VCS
  • area: main
  • in suites: lenny
  • size: 404 kB
  • ctags: 440
  • sloc: python: 2,048; makefile: 106; sh: 41
file content (626 lines) | stat: -rw-r--r-- 19,966 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
<head>
  <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
  <title>pprocess - Tutorial</title>
  <link href="styles.css" rel="stylesheet" type="text/css" />
</head>
<body>

<h1>pprocess - Tutorial</h1>

<p>The <code>pprocess</code> module provides several mechanisms for running
Python code concurrently in several processes. The most straightforward way of
making a program parallel-aware - that is, where the program can take
advantage of more than one processor to simultaneously process data - is to
use the <code>pmap</code> function.</p>

<h2>Converting Map-Style Code</h2>

<p>Consider a program using the built-in <code>map</code> function and a sequence of inputs:</p>

<pre>
    t = time.time()

    # Initialise an array.

    sequence = []
    for i in range(0, N):
        for j in range(0, N):
            sequence.append((i, j))

    # Perform the work.

    results = map(calculate, sequence)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple_map.py</code> file.)</p>

<p>The principal features of this program involve the preparation of an array
for input purposes, and the use of the <code>map</code> function to iterate
over the combinations of <code>i</code> and <code>j</code> in the array. Even
if the <code>calculate</code> function could be invoked independently for each
input value, we have to wait for each computation to complete before
initiating a new one. The <code>calculate</code> function may be defined as
follows:</p>

<pre>
def calculate(t):

    "A supposedly time-consuming calculation on 't'."

    i, j = t
    time.sleep(delay)
    return i * N + j
</pre>

<p>In order to reduce the processing time - to speed the code up, in other
words - we can make this code use several processes instead of just one. Here
is the modified code:</p>

<pre>
    t = time.time()

    # Initialise an array.

    sequence = []
    for i in range(0, N):
        for j in range(0, N):
            sequence.append((i, j))

    # Perform the work.

    results = <strong>pprocess.pmap</strong>(calculate, sequence<strong>, limit=limit</strong>)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple_pmap.py</code> file.)</p>

<p>By replacing usage of the <code>map</code> function with the
<code>pprocess.pmap</code> function, and specifying the limit on the number of
processes to be active at any given time (the value of the <code>limit</code>
variable is defined elsewhere), several calculations can now be performed in
parallel.</p>

<h2>Converting Invocations to Parallel Operations</h2>

<p>Although some programs make natural use of the <code>map</code> function,
others may employ an invocation in a nested loop. This may also be converted
to a parallel program. Consider the following Python code:</p>

<pre>
    t = time.time()

    # Initialise an array.

    results = []

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            results.append(calculate(i, j))

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple1.py</code> file.)</p>

<p>Here, a computation in the <code>calculate</code> function is performed for
each combination of <code>i</code> and <code>j</code> in the nested loop,
returning a result value. However, we must wait for the completion of this
function for each element before moving on to the next element, and this means
that the computations are performed sequentially. Consequently, on a system
with more than one processor, even if we could call <code>calculate</code> for
more than one combination of <code>i</code> and <code>j</code><code></code>
and have the computations executing at the same time, the above program will
not take advantage of such capabilities.</p>

<p>We use a slightly modified version of <code>calculate</code> which employs
two parameters instead of one:</p>

<pre>
def calculate(i, j):

    """
    A supposedly time-consuming calculation on 'i' and 'j'.
    """

    time.sleep(delay)
    return i * N + j
</pre>

<p>In order to reduce the processing time - to speed the code up, in other
words - we can make this code use several processes instead of just one. Here
is the modified code:</p>

<pre id="simple_managed_map">
    t = time.time()

    # Initialise the results using a map with a limit on the number of
    # channels/processes.

    <strong>results = pprocess.Map(limit=limit)</strong><code></code>

    # Wrap the calculate function and manage it.

    <strong>calc = results.manage(pprocess.MakeParallel(calculate))</strong>

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            <strong>calc</strong>(i, j)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple_managed_map.py</code> file.)</p>

<p>The principal changes in the above code involve the use of a
<code>pprocess.Map</code> object to collect the results, and a version of the
<code>calculate</code> function which is managed by the <code>Map</code>
object. What the <code>Map</code> object does is to arrange the results of
computations such that iterating over the object or accessing the object using
list operations provides the results in the same order as their corresponding
inputs.</p>

<h2>Converting Arbitrarily-Ordered Invocations</h2>

<p>In some programs, it is not important to receive the results of
computations in any particular order, usually because either the order of
these results is irrelevant, or because the results provide "positional"
information which let them be handled in an appropriate way. Consider the
following Python code:</p>

<pre>
    t = time.time()

    # Initialise an array.

    results = [0] * N * N

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            i2, j2, result = calculate(i, j)
            results[i2*N+j2] = result

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t
</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple2.py</code> file.)</p>

<p>Here, a result array is initialised first and each computation is performed
sequentially. A significant difference to the previous examples is the return
value of the <code>calculate</code> function: the position details
corresponding to <code>i</code> and <code>j</code> are returned alongside the
result. Obviously, this is of limited value in the above code because the
order of the computations and the reception of results is fixed. However, we
get no benefit from parallelisation in the above example.</p>

<p>We can bring the benefits of parallel processing to the above program with
the following code:</p>

<pre>
    t = time.time()

    # Initialise the communications queue with a limit on the number of
    # channels/processes.

    <strong>queue = pprocess.Queue(limit=limit)</strong>

    # Initialise an array.

    results = [0] * N * N

    # Wrap the calculate function and manage it.

    <strong>calc = queue.manage(pprocess.MakeParallel(calculate))</strong>

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            <strong>calc(i, j)</strong>

    # Store the results as they arrive.

    print "Finishing..."
    <strong>for i, j, result in queue:</strong>
        <strong>results[i*N+j] = result</strong>

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t
</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple_managed_queue.py</code> file.)</p>

<p>This revised code employs a <code>pprocess.Queue</code> object whose
purpose is to collect the results of computations and to make them available
in the order in which they were received. The code collecting results has been
moved into a separate loop independent of the original computation loop and
taking advantage of the more relevant "positional" information emerging from
the queue.</p>

<p>We can take this example further, illustrating some of the mechanisms
employed by <code>pprocess</code>. Instead of collecting results in a queue,
we can define a class containing a method which is called when new results
arrive:</p>

<pre>
class MyExchange(pprocess.Exchange):

    "Parallel convenience class containing the array assignment operation."

    def store_data(self, ch):
        i, j, result = ch.receive()
        self.D[i*N+j] = result
</pre>

<p>This code exposes the channel paradigm which is used throughout
<code>pprocess</code> and is available to applications, if desired. The effect
of the method is the storage of a result received through the channel in an
attribute of the object. The following code shows how this class can be used,
with differences to the previous program illustrated:</p>

<pre>
    t = time.time()

    # Initialise the communications exchange with a limit on the number of
    # channels/processes.

    <strong>exchange = MyExchange(limit=limit)</strong>

    # Initialise an array - it is stored in the exchange to permit automatic
    # assignment of values as the data arrives.

    <strong>results = exchange.D = [0] * N * N</strong>

    # Wrap the calculate function and manage it.

    calc = <strong>exchange</strong>.manage(pprocess.MakeParallel(calculate))

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            calc(i, j)

    # Wait for the results.

    print "Finishing..."
    <strong>exchange.finish()</strong>

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t
</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple_managed.py</code> file.)</p>

<p>The main visible differences between this and the previous program are the
storage of the result array in the exchange, the removal of the queue
consumption code from the main program, placing the act of storing values in
the exchange's <code>store_data</code> method, and the need to call the
<code>finish</code> method on the <code>MyExchange</code> object so that we do
not try and access the results too soon. One underlying benefit not visible in
the above code is that we no longer need to accumulate results in a queue or
other structure so that they may be processed and assigned to the correct
positions in the result array.</p>

<p>For the curious, we may remove some of the remaining conveniences of the
above program to expose other features of <code>pprocess</code>. First, we
define a slightly modified version of the <code>calculate</code> function:</p>

<pre>
def calculate(ch, i, j):

    """
    A supposedly time-consuming calculation on 'i' and 'j', using 'ch' to
    communicate with the parent process.
    """

    time.sleep(delay)
    ch.send((i, j, i * N + j))
</pre>

<p>This function accepts a channel, <code>ch</code>, through which results
will be sent, and through which other values could potentially be received,
although we choose not to do so here. The program using this function is as
follows, with differences to the previous program illustrated:</p>

<pre>
    t = time.time()

    # Initialise the communications exchange with a limit on the number of
    # channels/processes.

    exchange = MyExchange(limit=limit)

    # Initialise an array - it is stored in the exchange to permit automatic
    # assignment of values as the data arrives.

    results = exchange.D = [0] * N * N

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            <strong>exchange.start(calculate, i, j)</strong>

    # Wait for the results.

    print "Finishing..."
    exchange.finish()

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t
</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple_start.py</code> file.)</p>

<p>Here, we have discarded two conveniences: the wrapping of callables using
<code>MakeParallel</code>, which lets us use functions without providing any
channel parameters, and the management of callables using the
<code>manage</code> method on queues, exchanges, and so on. The
<code>start</code> method still calls the provided callable, but using a
different notation from that employed previously.</p>

<h2>Converting Inline Computations</h2>

<p>Although many programs employ functions and other useful abstractions which
can be treated as parallelisable units, some programs perform computations
"inline", meaning that the code responsible appears directly within a loop or
related control-flow construct. Consider the following code:</p>

<pre>
    t = time.time()

    # Initialise an array.

    results = [0] * N * N

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            time.sleep(delay)
            results[i*N+j] = i * N + j

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t
</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple.py</code> file.)</p>

<p>To simulate "work", as in the different versions of the
<code>calculate</code> function, we use the <code>time.sleep</code> function
(which does not actually do work, and which will cause a process to be
descheduled in most cases, but which simulates the delay associated with work
being done). This inline work, which must be performed sequentially in the
above program, can be performed in parallel in a somewhat modified version of
the program:</p>

<pre>
    t = time.time()

    # Initialise the results using a map with a limit on the number of
    # channels/processes.

    <strong>results = pprocess.Map(limit=limit)</strong>

    # Perform the work.
    # NOTE: Could use the with statement in the loop to package the
    # NOTE: try...finally functionality.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            <strong>ch = results.create()</strong>
            <strong>if ch:</strong>
                <strong>try: # Calculation work.</strong>

                    time.sleep(delay)
                    <strong>ch.send(i * N + j)</strong>

                <strong>finally: # Important finalisation.</strong>

                    <strong>pprocess.exit(ch)</strong>

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t
</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple_create_map.py</code> file.)</p>

<p>Although seemingly more complicated, the bulk of the changes in this
modified program are focused on obtaining a channel object, <code>ch</code>,
at the point where the computations are performed, and the wrapping of the
computation code in a <code>try</code>...<code>finally</code> statement which
ensures that the process associated with the channel exits when the
computation is complete. In order for the results of these computations to be
collected, a <code>pprocess.Map</code> object is used, since it will maintain
the results in the same order as the initiation of the computations which
produced them.</p>

<h2>Reusing Processes in Parallel Programs</h2>

<p>One notable aspect of the above programs when parallelised is that each
invocation of a computation in parallel creates a new process in which the
computation is to be performed, regardless of whether existing processes had
just finished producing results and could theoretically have been asked to
perform new computations. In other words, processes were created and destroyed
instead of being reused.</p>

<p>However, we can request that processes be reused for computations by
enabling the <code>reuse</code> feature of exchange-like objects and employing
suitable reusable callables. Consider this modified version of the <a
href="#simple_managed_map">simple_managed_map</a> program:</p>

<pre>
    t = time.time()

    # Initialise the results using a map with a limit on the number of
    # channels/processes.

    results = pprocess.Map(limit=limit<strong>, reuse=1</strong>)

    # Wrap the calculate function and manage it.

    calc = results.manage(pprocess.Make<strong>Reusable</strong>(calculate))

    # Perform the work.

    print "Calculating..."
    for i in range(0, N):
        for j in range(0, N):
            calc(i, j)

    # Show the results.

    for i in range(0, N):
        for result in results[i*N:i*N+N]:
            print result,
        print

    print "Time taken:", time.time() - t
</pre>

<p>(This code in context with <code>import</code> statements and functions is
found in the <code>examples/simple_manage_map_reusable.py</code> file.)</p>

<p>By indicating that processes and channels shall be reused, and by wrapping
the <code>calculate</code> function with the necessary support, the
computations may be performed in parallel using a pool of processes instead of
creating a new process for each computation and then discarding it, only to
create a new process for the next computation.</p>

<h2>Summary</h2>

<p>The following table indicates the features used in converting one
sequential example program to another parallel program:</p>

<table border="1" cellspacing="0" cellpadding="5">
  <thead>
    <tr>
      <th>Sequential Example</th>
      <th>Parallel Example</th>
      <th>Features Used</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>simple_map</td>
      <td>simple_pmap</td>
      <td>pmap</td>
    </tr>
    <tr>
      <td>simple1</td>
      <td>simple_managed_map</td>
      <td>MakeParallel, Map, manage</td>
    </tr>
    <tr>
      <td rowspan="3">simple2</td>
      <td>simple_managed_queue</td>
      <td>MakeParallel, Queue, manage</td>
    </tr>
    <tr>
      <td>simple_managed</td>
      <td>MakeParallel, Exchange (subclass), manage, finish</td>
    </tr>
    <tr>
      <td>simple_start</td>
      <td>Channel, Exchange (subclass), start, finish</td>
    </tr>
    <tr>
      <td>simple</td>
      <td>simple_create_map</td>
      <td>Channel, Map, create, exit</td>
    </tr>
  </tbody>
</table>

</body>
</html>