File: multiprocessing_queues.py

package info (click to toggle)
enlighten 1.11.2-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 1,480 kB
  • sloc: python: 3,419; makefile: 20
file content (101 lines) | stat: -rw-r--r-- 3,077 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright 2019 - 2020 Avram Lubkin, All Rights Reserved

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""
Example of using Enlighten with multiprocessing

This example uses queues for inter-process communication (IPC)
"""

from multiprocessing import Process, Queue
import random
import time

import enlighten


WORKERS = 4
SYSTEMS = (10, 20)
FILES = (100, 200)
FILE_TIME = (0.01, 0.05)


def process_files(queue, count):
    """
    Simple child processor

    Sleeps for a random interval and pushes the current count onto the queue
    """

    for num in range(1, count + 1):
        time.sleep(random.uniform(*FILE_TIME))  # Random processing time
        queue.put(num)


def multiprocess_systems(manager, systems):
    """
    Process a random number of virtual files in subprocesses for the given number of systems
    """

    started = 0
    active = {}
    pb_started = manager.counter(total=systems, desc='Systems:', unit='systems', color='yellow')
    pb_finished = pb_started.add_subcounter('green', all_fields=True)

    # Loop until all systems finish
    while systems > started or active:

        # If there are free workers and tasks left to run, start them
        if systems > started and len(active) < WORKERS:
            queue = Queue()
            files = random.randint(*FILES)
            process = Process(target=process_files, args=(queue, files))
            started += 1
            counter = manager.counter(total=files, desc='  System %d:' % started,
                                      unit='files', leave=False)
            process.start()
            pb_started.update()
            active[started] = (process, queue, counter)

        # Iterate through active subprocesses
        for system in tuple(active.keys()):
            process, queue, counter = active[system]
            alive = process.is_alive()

            # Latest count is the last one on the queue
            count = None
            while not queue.empty():
                count = queue.get()

            # Update counter. We do it manually because we have the number not the increment
            if count is not None:
                counter.count = count
                # If no sleep is used in loop use counter.update(0) instead
                counter.refresh()

            # Remove any finished subprocesses and update progress bar
            # If this was real code you could check for failures
            if not alive:
                counter.close()
                print('Processed %d files on System %d' % (counter.total, system))
                del active[system]
                pb_finished.update_from(pb_started)

        # Sleep for 1/10 of a second to reduce load
        time.sleep(0.1)


def main():
    """
    Main function
    """

    with enlighten.get_manager() as manager:
        multiprocess_systems(manager, random.randint(*SYSTEMS))


if __name__ == '__main__':
    main()