File: reference.html

package info (click to toggle)
pprocess 0.3.1-1
  • links: PTS, VCS
  • area: main
  • in suites: lenny
  • size: 404 kB
  • ctags: 440
  • sloc: python: 2,048; makefile: 106; sh: 41
file content (367 lines) | stat: -rw-r--r-- 13,952 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
<head>
  <meta http-equiv="content-type" content="text/html; charset=UTF-8" />
  <title>pprocess - Reference</title>
  <link href="styles.css" rel="stylesheet" type="text/css" />
</head>
<body>

<h1>pprocess - Reference</h1>

<p>The <code>pprocess</code> module defines a simple parallel processing API
for Python, inspired somewhat by the <code>thread</code> module, slightly less
by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly
less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p>

<p>This document complements the <a href="tutorial.html">tutorial</a> by
providing an overview of the different styles of parallel programming supported
by the module. For an introduction and in order to get a clearer idea of the
most suitable styles for your own programs, consult the
<a href="tutorial.html">tutorial</a>.</p>

<ul>
<li><a href="#Thread">Thread-style Processing</a></li>
<li><a href="#Fork">Fork-style Processing</a></li>
<li><a href="#Exchanges">Message Exchanges</a></li>
<li><a href="#Convenient">Convenient Message Exchanges</a></li>
<li><a href="#Queues">Exchanges as Queues</a></li>
<li><a href="#Maps">Exchanges as Maps</a></li>
<li><a href="#Managed">Managed Callables</a></li>
<li><a href="#MakeParallel">Making Existing Functions Parallel</a></li>
<li><a href="#Map">Map-style Processing</a></li>
<li><a href="#Reusing">Reusing Processes and Channels</a></li>
<li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li>
<li><a href="#Implementation">Implementation Notes</a></li>
</ul>

<h2 id="Thread">Thread-style Processing</h2>

<p>To create new processes to run a function or any callable object, specify the
"callable" and any arguments as follows:</p>

<pre>
channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)
</pre>

<p>This returns a channel which can then be used to communicate with the created
process. Meanwhile, in the created process, the given callable will be invoked
with another channel as its first argument followed by the specified arguments:</p>

<pre>
def fn(channel, arg1, arg2, named1, named2):
    # Read from and write to the channel.
    # Return value is ignored.
    ...
</pre>

<h2 id="Fork">Fork-style Processing</h2>

<p>To create new processes in a similar way to that employed when using <code>os.fork</code>
(ie. the <code>fork</code> system call on various operating systems), use the following
method:</p>

<pre>
channel = pprocess.create()
if channel.pid == 0:
    # This code is run by the created process.
    # Read from and write to the channel to communicate with the
    # creating/calling process.
    # An explicit exit of the process may be desirable to prevent the process
    # from running code which is intended for the creating/calling process.
    ...
    pprocess.exit(channel)
else:
    # This code is run by the creating/calling process.
    # Read from and write to the channel to communicate with the created
    # process.
    ...
</pre>

<h2 id="Exchanges">Message Exchanges</h2>

<p>When creating many processes, each providing results for the consumption of the
main process, the collection of those results in an efficient fashion can be
problematic: if some processes take longer than others, and if we decide to read
from those processes when they are not ready instead of other processes which
are ready, the whole activity will take much longer than necessary.</p>

<p>One solution to the problem of knowing when to read from channels is to create
an <code>Exchange</code> object, optionally initialising it with a list of channels
through which data is expected to arrive:</p>

<pre>
exchange = pprocess.Exchange()           # populate the exchange later
exchange = pprocess.Exchange(channels)   # populate the exchange with channels
</pre>

<p>We can add channels to the exchange using the <code>add</code> method:</p>

<pre>
exchange.add(channel)
</pre>

<p>To test whether an exchange is active - that is, whether it is actually
monitoring any channels - we can use the <code>active</code> method which
returns all channels being monitored by the exchange:</p>

<pre>
channels = exchange.active()
</pre>

<p>We may then check the exchange to see whether any data is ready to be received;
for example:</p>

<pre>
for channel in exchange.ready():
    # Read from and write to the channel.
    ...
</pre>

<p>If we do not wish to wait indefinitely for a list of channels, we can set a
timeout value as an argument to the <code>ready</code> method (as a floating
point number specifying the timeout in seconds, where <code>0</code> means a
non-blocking poll as stated in the <code>select</code> module's <code>select</code>
function documentation).</p>

<h2 id="Convenient">Convenient Message Exchanges</h2>

<p>A convenient form of message exchanges can be adopted by defining a subclass of
the <code>Exchange</code> class and defining a particular method:</p>

<pre>
class MyExchange(pprocess.Exchange):
    def store_data(self, channel):
        data = channel.receive()
        # Do something with data here.
</pre>

<p>The exact operations performed on the received data might be as simple as
storing it on an instance attribute. To make use of the exchange, we would
instantiate it as usual:</p>

<pre>
exchange = MyExchange()         # populate the exchange later
exchange = MyExchange(limit=10) # set a limit for later population
</pre>

<p>The exchange can now be used in a simpler fashion than that shown above. We can
add channels as before using the <code>add</code> method, or we can choose to only
add channels if the specified limit of channels is not exceeded:</p>

<pre>
exchange.add(channel)           # add a channel as normal
exchange.add_wait(channel)      # add a channel, waiting if the limit would be
                                # exceeded
</pre>

<p>Or we can request that the exchange create a channel on our behalf:</p>

<pre>
channel = exchange.create()
</pre>

<p>We can even start processes and monitor channels without ever handling the
channel ourselves:</p>

<pre>
exchange.start(fn, arg1, arg2, named1=value1, named2=value2)
</pre>

<p>We can explicitly wait for "free space" for channels by calling the
<code>wait</code> method, although the <code>start</code> and <code>add_wait</code>
methods make this less interesting:</p>

<pre>
exchange.wait()
</pre>

<p>Finally, when finishing the computation, we can choose to merely call the
<code>finish</code> method and have the remaining data processed automatically:</p>

<pre>
exchange.finish()
</pre>

<p>Clearly, this approach is less flexible but more convenient than the raw message
exchange API as described above. However, it permits much simpler and clearer
code.</p>

<h2 id="Queues">Exchanges as Queues</h2>

<p>Instead of having to subclass the <code>pprocess.Exchange</code> class and
to define the <code>store_data</code> method, it might be more desirable to let
the exchange manage the communications between created and creating processes
and to let the creating process just consume received data as it arrives,
without particular regard for the order of the received data - perhaps the
creating process has its own way of managing such issues.</p>

<p>For such situations, the <code>Queue</code> class may be instantiated and
channels added to the queue using the various methods provided:</p>

<pre>
queue = pprocess.Queue(limit=10)
channel = queue.create()
if channel:
    # Do some computation.
    pprocess.exit(channel)
</pre>

<p>The results can then be consumed by treating the queue like an iterator:</p>

<pre>
for result in queue:
    # Capture each result.
</pre>

<p>This approach does not, of course, require the direct handling of channels.
One could instead use the <code>start</code> method on the queue to create
processes and to initiate computations (since a queue is merely an enhanced
exchange with a specific implementation of the <code>store_data</code>
method).</p>

<h2 id="Maps">Exchanges as Maps</h2>

<p>Where the above <code>Queue</code> class appears like an attractive solution
for the management of the results of computations, but where the order of their
consumption by the creating process remains important, the <code>Map</code>
class may offer a suitable way of collecting and accessing results:</p>

<pre>
results = pprocess.Map(limit=10)
for value in inputs:
    results.start(fn, args)
</pre>

<p>The results can then be consumed in an order corresponding to the order of the
computations which produced them:</p>

<pre>
for result in results:
    # Process each result.
</pre>

<p>Internally, the <code>Map</code> object records a particular ordering of
channels, ensuring that the received results can be mapped to this ordering,
and that the results can be made available with this ordering preserved.</p>

<h2 id="Managed">Managed Callables</h2>

<p>A further simplification of the above convenient use of message exchanges
involves the creation of callables (eg. functions) which are automatically
monitored by an exchange. We create such a callable by calling the
<code>manage</code> method on an exchange:</p>

<pre>
myfn = exchange.manage(fn)
</pre>

<p>This callable can then be invoked instead of using the exchange's
<code>start</code> method:</p>

<pre>
myfn(arg1, arg2, named1=value1, named2=value2)
</pre>

<p>The exchange's <code>finish</code> method can be used as usual to process
incoming data.</p>

<h2 id="MakeParallel">Making Existing Functions Parallel</h2>

<p>In making a program parallel, existing functions which only return results can
be manually modified to accept and use channels to communicate results back to
the main process. However, a simple alternative is to use the <code>MakeParallel</code>
class to provide a wrapper around unmodified functions which will return the results
from those functions in the channels provided. For example:</p>

<pre>
fn = pprocess.MakeParallel(originalfn)
</pre>

<h2 id="Map">Map-style Processing</h2>

<p>In situations where a callable would normally be used in conjunction with the
Python built-in <code>map</code> function, an alternative solution can be adopted by using
the <code>pmap</code> function:</p>

<pre>
pprocess.pmap(fn, sequence)
</pre>

<p>Here, the sequence would have to contain elements that each contain the
required parameters of the specified callable, <code>fn</code>. Note that the
callable does not need to be a parallel-aware function which has a
<code>channel</code> argument: the <code>pmap</code> function automatically
wraps the given callable internally.</p>

<h2 id="Reusing">Reusing Processes and Channels</h2>

<p>So far, all parallel computations have been done with newly-created
processes. However, this can seem somewhat inefficient, especially if processes
are being continually created and destroyed (although if this happens too
often, the amount of work done by each process may be too little, anyway). One
solution is to retain processes after they have done their work and request
that they perform more work for each new parallel task or invocation. To enable
the reuse of processes in this way, a special keyword argument may be specified
when creating <code>Exchange</code> instances (and instances of subclasses such
as <code>Map</code> and <code>Queue</code>). For example:</p>

<pre>
exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
</pre>

<p>Code invoked through such exchanges must be aware of channels and be
constructed in such a way that it does not terminate after sending a result
back to the creating process. Instead, it should repeatedly wait for subsequent
sets of parameters (compatible with those either in the signature of a callable
or with the original values read from the channel). Reusable code is terminated
when the special value of <code>None</code> is sent from the creating process
to the created process, indicating that no more parameters will be sent; this
should cause the code to terminate.</p>

<h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2>

<p>An easier way of making reusable code sections for parallel use is to employ the
<code>MakeReusable</code> class to wrap an existing callable:</p>

<pre>
fn = pprocess.MakeReusable(originalfn)
</pre>

<p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but
provides the necessary mechanisms described above for reusable code.</p>

<h2 id="Implementation">Implementation Notes</h2>

<h3>Signals and Waiting</h3>

<p>When created/child processes terminate, one would typically want to be informed
of such conditions using a signal handler. Unfortunately, Python seems to have
issues with restartable reads from file descriptors when interrupted by signals:</p>

<ul>
<li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li>
<li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li>
</ul>

<h3>Select and Poll</h3>

<p>The exact combination of conditions indicating closed pipes remains relatively
obscure. Here is a message/thread describing them (in the context of another
topic):</p>

<ul>
<li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li>
</ul>

<p>It would seem, from using sockets and from studying the <code>asyncore</code>
module, that sockets are more predictable than pipes.</p>

<p>Notes about <code>poll</code> implementations can be found here:</p>

<ul>
<li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li>
</ul>

</body>
</html>