Prev         Up         Next

Process Pools

The processing.pool module has one public class:

class Pool(processes=None, initializer=None, initargs=())

A class representing a pool of worker processes.

Tasks can be offloaded to the pool and the results dealt with when they become available.

Note that tasks can only be submitted (or retrieved) by the process which created the pool object.

processes is the number of worker processes to use. If processes is None then the number returned by cpuCount() is used. If initializer is not None then each worker process will call initializer(*initargs) when it starts.

Pool objects

Pool has the following public methods:

__init__(processes=None)
The constructor creates and starts processes worker processes. If processes is None then cpuCount() is used to find a default or 1 if cpuCount() raises NotImplemented.
apply(func, args=(), kwds={})
Equivalent of the apply() builtin function. It blocks till the result is ready.
applyAsync(func, args=(), kwds={}, callback=None)

A variant of the apply() method which returns a result object --- see Asynchronous result objects.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

map(func, iterable, chunksize=None)

A parallel equivalent of the map() builtin function. It blocks till the result is ready.

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

mapAsync(func, iterable, chunksize=None, callback=None)

A variant of the map() method which returns a result object --- see Asynchronous result objects.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

imap(func, iterable, chunksize=1)

An equivalent of itertools.imap().

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make make the job complete much faster than using the default value of 1.

Also if chunksize is 1 then the next() method of the iterator returned by the imap() method has an optional timeout parameter: next(timeout) will raise processing.TimeoutError if the result cannot be returned within timeout seconds.

imapUnordered(func, iterable, chunksize=1)
The same as imap() except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be "correct".)
close()
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
terminate()
Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected terminate() will be called immediately.
join()
Wait for the worker processes to exit. One must call close() or terminate() before using join().

Asynchronous result objects

The result objects returns by applyAsync() and mapAsync() have the following public methods:

get(timeout=None)
Returns the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then processing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().
wait(timeout=None)
Waits until the result is available or until timeout seconds pass.
ready()
Returns whether the call has completed.
successful()
Returns whether the call completed without raising an exception. Will raise AssertionError if the result is not ready.

Examples

The following example demonstrates the use of a pool:

from processing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    result = pool.applyAsync(f, (10,))    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow

    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

    it = pool.imap(f, range(10))
    print it.next()                       # prints "0"
    print it.next()                       # prints "1"
    print it.next(timeout=1)              # prints "4" unless your computer is *very* slow

    import time
    result = pool.applyAsync(time.sleep, (10,))
    print result.get(timeout=1)           # raises `TimeoutError`

See also ex_pool.py.