1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
|
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="en-gb">
<head>
<meta http-equiv="content-type" content="text/html; charset=UTF-8" />
<title>pprocess - Reference</title>
<link href="styles.css" rel="stylesheet" type="text/css" />
</head>
<body>
<h1>pprocess - Reference</h1>
<p>The <code>pprocess</code> module defines a simple parallel processing API
for Python, inspired somewhat by the <code>thread</code> module, slightly less
by <a href="http://datamining.anu.edu.au/~ole/pypar/">pypar</a>, and slightly
less still by <a href="http://pypvm.sourceforge.net/">pypvm</a>.</p>
<p>This document complements the <a href="tutorial.html">tutorial</a> by
providing an overview of the different styles of parallel programming supported
by the module. For an introduction and in order to get a clearer idea of the
most suitable styles for your own programs, consult the
<a href="tutorial.html">tutorial</a>.</p>
<ul>
<li><a href="#Thread">Thread-style Processing</a></li>
<li><a href="#Fork">Fork-style Processing</a></li>
<li><a href="#Exchanges">Message Exchanges</a></li>
<li><a href="#Convenient">Convenient Message Exchanges</a></li>
<li><a href="#Queues">Exchanges as Queues</a></li>
<li><a href="#Maps">Exchanges as Maps</a></li>
<li><a href="#Managed">Managed Callables</a></li>
<li><a href="#MakeParallel">Making Existing Functions Parallel</a></li>
<li><a href="#Map">Map-style Processing</a></li>
<li><a href="#Reusing">Reusing Processes and Channels</a></li>
<li><a href="#MakeReusable">Making Existing Functions Parallel and Reusable</a></li>
<li><a href="#Implementation">Implementation Notes</a></li>
</ul>
<h2 id="Thread">Thread-style Processing</h2>
<p>To create new processes to run a function or any callable object, specify the
"callable" and any arguments as follows:</p>
<pre>
channel = pprocess.start(fn, arg1, arg2, named1=value1, named2=value2)
</pre>
<p>This returns a channel which can then be used to communicate with the created
process. Meanwhile, in the created process, the given callable will be invoked
with another channel as its first argument followed by the specified arguments:</p>
<pre>
def fn(channel, arg1, arg2, named1, named2):
# Read from and write to the channel.
# Return value is ignored.
...
</pre>
<h2 id="Fork">Fork-style Processing</h2>
<p>To create new processes in a similar way to that employed when using <code>os.fork</code>
(ie. the <code>fork</code> system call on various operating systems), use the following
method:</p>
<pre>
channel = pprocess.create()
if channel.pid == 0:
# This code is run by the created process.
# Read from and write to the channel to communicate with the
# creating/calling process.
# An explicit exit of the process may be desirable to prevent the process
# from running code which is intended for the creating/calling process.
...
pprocess.exit(channel)
else:
# This code is run by the creating/calling process.
# Read from and write to the channel to communicate with the created
# process.
...
</pre>
<h2 id="Exchanges">Message Exchanges</h2>
<p>When creating many processes, each providing results for the consumption of the
main process, the collection of those results in an efficient fashion can be
problematic: if some processes take longer than others, and if we decide to read
from those processes when they are not ready instead of other processes which
are ready, the whole activity will take much longer than necessary.</p>
<p>One solution to the problem of knowing when to read from channels is to create
an <code>Exchange</code> object, optionally initialising it with a list of channels
through which data is expected to arrive:</p>
<pre>
exchange = pprocess.Exchange() # populate the exchange later
exchange = pprocess.Exchange(channels) # populate the exchange with channels
</pre>
<p>We can add channels to the exchange using the <code>add</code> method:</p>
<pre>
exchange.add(channel)
</pre>
<p>To test whether an exchange is active - that is, whether it is actually
monitoring any channels - we can use the <code>active</code> method which
returns all channels being monitored by the exchange:</p>
<pre>
channels = exchange.active()
</pre>
<p>We may then check the exchange to see whether any data is ready to be received;
for example:</p>
<pre>
for channel in exchange.ready():
# Read from and write to the channel.
...
</pre>
<p>If we do not wish to wait indefinitely for a list of channels, we can set a
timeout value as an argument to the <code>ready</code> method (as a floating
point number specifying the timeout in seconds, where <code>0</code> means a
non-blocking poll as stated in the <code>select</code> module's <code>select</code>
function documentation).</p>
<h2 id="Convenient">Convenient Message Exchanges</h2>
<p>A convenient form of message exchanges can be adopted by defining a subclass of
the <code>Exchange</code> class and defining a particular method:</p>
<pre>
class MyExchange(pprocess.Exchange):
def store_data(self, channel):
data = channel.receive()
# Do something with data here.
</pre>
<p>The exact operations performed on the received data might be as simple as
storing it on an instance attribute. To make use of the exchange, we would
instantiate it as usual:</p>
<pre>
exchange = MyExchange() # populate the exchange later
exchange = MyExchange(limit=10) # set a limit for later population
</pre>
<p>The exchange can now be used in a simpler fashion than that shown above. We can
add channels as before using the <code>add</code> method, or we can choose to only
add channels if the specified limit of channels is not exceeded:</p>
<pre>
exchange.add(channel) # add a channel as normal
exchange.add_wait(channel) # add a channel, waiting if the limit would be
# exceeded
</pre>
<p>Or we can request that the exchange create a channel on our behalf:</p>
<pre>
channel = exchange.create()
</pre>
<p>We can even start processes and monitor channels without ever handling the
channel ourselves:</p>
<pre>
exchange.start(fn, arg1, arg2, named1=value1, named2=value2)
</pre>
<p>We can explicitly wait for "free space" for channels by calling the
<code>wait</code> method, although the <code>start</code> and <code>add_wait</code>
methods make this less interesting:</p>
<pre>
exchange.wait()
</pre>
<p>Finally, when finishing the computation, we can choose to merely call the
<code>finish</code> method and have the remaining data processed automatically:</p>
<pre>
exchange.finish()
</pre>
<p>Clearly, this approach is less flexible but more convenient than the raw message
exchange API as described above. However, it permits much simpler and clearer
code.</p>
<h2 id="Queues">Exchanges as Queues</h2>
<p>Instead of having to subclass the <code>pprocess.Exchange</code> class and
to define the <code>store_data</code> method, it might be more desirable to let
the exchange manage the communications between created and creating processes
and to let the creating process just consume received data as it arrives,
without particular regard for the order of the received data - perhaps the
creating process has its own way of managing such issues.</p>
<p>For such situations, the <code>Queue</code> class may be instantiated and
channels added to the queue using the various methods provided:</p>
<pre>
queue = pprocess.Queue(limit=10)
channel = queue.create()
if channel:
# Do some computation.
pprocess.exit(channel)
</pre>
<p>The results can then be consumed by treating the queue like an iterator:</p>
<pre>
for result in queue:
# Capture each result.
</pre>
<p>This approach does not, of course, require the direct handling of channels.
One could instead use the <code>start</code> method on the queue to create
processes and to initiate computations (since a queue is merely an enhanced
exchange with a specific implementation of the <code>store_data</code>
method).</p>
<h2 id="Maps">Exchanges as Maps</h2>
<p>Where the above <code>Queue</code> class appears like an attractive solution
for the management of the results of computations, but where the order of their
consumption by the creating process remains important, the <code>Map</code>
class may offer a suitable way of collecting and accessing results:</p>
<pre>
results = pprocess.Map(limit=10)
for value in inputs:
results.start(fn, args)
</pre>
<p>The results can then be consumed in an order corresponding to the order of the
computations which produced them:</p>
<pre>
for result in results:
# Process each result.
</pre>
<p>Internally, the <code>Map</code> object records a particular ordering of
channels, ensuring that the received results can be mapped to this ordering,
and that the results can be made available with this ordering preserved.</p>
<h2 id="Managed">Managed Callables</h2>
<p>A further simplification of the above convenient use of message exchanges
involves the creation of callables (eg. functions) which are automatically
monitored by an exchange. We create such a callable by calling the
<code>manage</code> method on an exchange:</p>
<pre>
myfn = exchange.manage(fn)
</pre>
<p>This callable can then be invoked instead of using the exchange's
<code>start</code> method:</p>
<pre>
myfn(arg1, arg2, named1=value1, named2=value2)
</pre>
<p>The exchange's <code>finish</code> method can be used as usual to process
incoming data.</p>
<h2 id="MakeParallel">Making Existing Functions Parallel</h2>
<p>In making a program parallel, existing functions which only return results can
be manually modified to accept and use channels to communicate results back to
the main process. However, a simple alternative is to use the <code>MakeParallel</code>
class to provide a wrapper around unmodified functions which will return the results
from those functions in the channels provided. For example:</p>
<pre>
fn = pprocess.MakeParallel(originalfn)
</pre>
<h2 id="Map">Map-style Processing</h2>
<p>In situations where a callable would normally be used in conjunction with the
Python built-in <code>map</code> function, an alternative solution can be adopted by using
the <code>pmap</code> function:</p>
<pre>
pprocess.pmap(fn, sequence)
</pre>
<p>Here, the sequence would have to contain elements that each contain the
required parameters of the specified callable, <code>fn</code>. Note that the
callable does not need to be a parallel-aware function which has a
<code>channel</code> argument: the <code>pmap</code> function automatically
wraps the given callable internally.</p>
<h2 id="Reusing">Reusing Processes and Channels</h2>
<p>So far, all parallel computations have been done with newly-created
processes. However, this can seem somewhat inefficient, especially if processes
are being continually created and destroyed (although if this happens too
often, the amount of work done by each process may be too little, anyway). One
solution is to retain processes after they have done their work and request
that they perform more work for each new parallel task or invocation. To enable
the reuse of processes in this way, a special keyword argument may be specified
when creating <code>Exchange</code> instances (and instances of subclasses such
as <code>Map</code> and <code>Queue</code>). For example:</p>
<pre>
exchange = MyExchange(limit=10, reuse=1) # reuse up to 10 processes
</pre>
<p>Code invoked through such exchanges must be aware of channels and be
constructed in such a way that it does not terminate after sending a result
back to the creating process. Instead, it should repeatedly wait for subsequent
sets of parameters (compatible with those either in the signature of a callable
or with the original values read from the channel). Reusable code is terminated
when the special value of <code>None</code> is sent from the creating process
to the created process, indicating that no more parameters will be sent; this
should cause the code to terminate.</p>
<h2 id="MakeReusable">Making Existing Functions Parallel and Reusable</h2>
<p>An easier way of making reusable code sections for parallel use is to employ the
<code>MakeReusable</code> class to wrap an existing callable:</p>
<pre>
fn = pprocess.MakeReusable(originalfn)
</pre>
<p>This wraps the callable in a similar fashion to <code>MakeParallel</code>, but
provides the necessary mechanisms described above for reusable code.</p>
<h2 id="Implementation">Implementation Notes</h2>
<h3>Signals and Waiting</h3>
<p>When created/child processes terminate, one would typically want to be informed
of such conditions using a signal handler. Unfortunately, Python seems to have
issues with restartable reads from file descriptors when interrupted by signals:</p>
<ul>
<li><a href="http://mail.python.org/pipermail/python-dev/2002-September/028572.html">http://mail.python.org/pipermail/python-dev/2002-September/028572.html</a></li>
<li><a href="http://twistedmatrix.com/bugs/issue733">http://twistedmatrix.com/bugs/issue733</a></li>
</ul>
<h3>Select and Poll</h3>
<p>The exact combination of conditions indicating closed pipes remains relatively
obscure. Here is a message/thread describing them (in the context of another
topic):</p>
<ul>
<li><a href="http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html">http://twistedmatrix.com/pipermail/twisted-python/2005-February/009666.html</a></li>
</ul>
<p>It would seem, from using sockets and from studying the <code>asyncore</code>
module, that sockets are more predictable than pipes.</p>
<p>Notes about <code>poll</code> implementations can be found here:</p>
<ul>
<li><a href="http://www.greenend.org.uk/rjk/2001/06/poll.html">http://www.greenend.org.uk/rjk/2001/06/poll.html</a></li>
</ul>
</body>
</html>
|