File: _fair_semaphore.py

package info (click to toggle)
python-keystoneauth1 4.2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,996 kB
  • sloc: python: 17,552; xml: 285; makefile: 91
file content (104 lines) | stat: -rw-r--r-- 3,855 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import threading
import time


from six.moves import queue


class FairSemaphore(object):
    """Semaphore class that notifies in order of request.

    We cannot use a normal Semaphore because it doesn't give any ordering,
    which could lead to a request starving. Instead, handle them in the
    order we receive them.

    :param int concurrency:
        How many concurrent threads can have the semaphore at once.
    :param float rate_delay:
        How long to wait between the start of each thread receiving the
        semaphore.
    """

    def __init__(self, concurrency, rate_delay):
        self._lock = threading.Lock()
        self._concurrency = concurrency
        if concurrency:
            self._count = 0
            self._queue = queue.Queue()

        self._rate_delay = rate_delay
        self._rate_last_ts = time.time()

    def __enter__(self):
        """Aquire a semaphore."""
        # If concurrency is None, everyone is free to immediately execute.
        if not self._concurrency:
            # NOTE: Rate limiting still applies.This will ultimately impact
            # concurrency a bit due to the mutex.
            with self._lock:
                execution_time = self._advance_timer()
        else:
            execution_time = self._get_ticket()
        return self._wait_for_execution(execution_time)

    def _wait_for_execution(self, execution_time):
        """Wait until the pre-calculated time to run."""
        wait_time = execution_time - time.time()
        if wait_time > 0:
            time.sleep(wait_time)

    def _get_ticket(self):
        ticket = threading.Event()
        with self._lock:
            if self._count <= self._concurrency:
                # We can execute, no need to wait. Take a ticket and
                # move on.
                self._count += 1
                return self._advance_timer()
            else:
                # We need to wait for a ticket before we can execute.
                # Put ourselves in the ticket queue to be woken up
                # when available.
                self._queue.put(ticket)
        ticket.wait()
        with self._lock:
            return self._advance_timer()

    def _advance_timer(self):
        """Calculate the time when it's ok to run a command again.

        This runs inside of the mutex, serializing the calculation
        of when it's ok to run again and setting _rate_last_ts to that
        new time so that the next thread to calculate when it's safe to
        run starts from the time that the current thread calculated.
        """
        self._rate_last_ts = self._rate_last_ts + self._rate_delay
        return self._rate_last_ts

    def __exit__(self, exc_type, exc_value, traceback):
        """Release the semaphore."""
        # If concurrency is None, everyone is free to immediately execute
        if not self._concurrency:
            return
        with self._lock:
            # If waiters, wake up the next item in the queue (note
            # we're under the queue lock so the queue won't change
            # under us).
            if self._queue.qsize() > 0:
                ticket = self._queue.get()
                ticket.set()
            else:
                # Nothing else to do, give our ticket back
                self._count -= 1