File: bounded_queue_executor.py

package info (click to toggle)
python-b2sdk 2.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,020 kB
  • sloc: python: 30,902; sh: 13; makefile: 8
file content (78 lines) | stat: -rw-r--r-- 2,316 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
######################################################################
#
# File: b2sdk/_internal/bounded_queue_executor.py
#
# Copyright 2019 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import threading


class BoundedQueueExecutor:
    """
    Wrap a concurrent.futures.Executor and limits the number of requests that
    can be queued at once.  Requests to submit() tasks block until
    there is room in the queue.

    The number of available slots in the queue is tracked with a
    semaphore that is acquired before queueing an action, and
    released when an action finishes.

    Counts the number of exceptions thrown by tasks, and makes them
    available from get_num_exceptions() after shutting down.
    """

    def __init__(self, executor, queue_limit):
        """
        :param executor: an executor to be wrapped
        :type executor: concurrent.futures.Executor
        :param queue_limit: a queue limit
        :type queue_limit: int
        """
        self.executor = executor
        self.semaphore = threading.Semaphore(queue_limit)
        self.num_exceptions = 0

    def submit(self, fcn, *args, **kwargs):
        """
        Start execution of a callable with the given optional and positional arguments

        :param fcn: a callable object
        :type fcn: callable
        :return: a future object
        :rtype: concurrent.futures.Future
        """
        # Wait until there is room in the queue.
        self.semaphore.acquire()

        # Wrap the action in a function that will release
        # the semaphore after it runs.
        def run_it():
            try:
                return fcn(*args, **kwargs)
            except Exception:
                self.num_exceptions += 1
                raise
            finally:
                self.semaphore.release()

        # Submit the wrapped action.
        return self.executor.submit(run_it)

    def shutdown(self):
        """
        Shut an executor down.
        """
        self.executor.shutdown()

    def get_num_exceptions(self):
        """
        Return a number of exceptions.

        :rtype: int
        """
        return self.num_exceptions