File: ex_workers.py

package info (click to toggle)
multiprocess 0.70.17-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 9,900 kB
  • sloc: python: 70,095; ansic: 7,509; makefile: 16
file content (87 lines) | stat: -rw-r--r-- 2,196 bytes parent folder | download | duplicates (30)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#

import time
import random

from multiprocess import current_process as currentProcess, Process, freeze_support as freezeSupport
from multiprocess import Queue

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (currentProcess()._name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]
    
    # Create queues
    task_queue = Queue()
    done_queue = Queue()
    
    # Submit tasks
    list(map(task_queue.put, TASKS1))
        
    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()
        
    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())
        
    # Add more tasks using `put()` instead of `putMany()`
    for task in TASKS2:
        task_queue.put(task)
    
    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())
        
    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freezeSupport()
    test()