File: _semaphore.pyx

package info (click to toggle)
python-gevent 1.0.1-2
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 9,948 kB
  • ctags: 12,954
  • sloc: python: 39,061; ansic: 26,289; sh: 13,582; makefile: 833; awk: 18
file content (131 lines) | stat: -rw-r--r-- 4,235 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import sys
from gevent.hub import get_hub, getcurrent
from gevent.timeout import Timeout


__all__ = ['Semaphore']


class Semaphore(object):
    """A semaphore manages a counter representing the number of release() calls minus the number of acquire() calls,
    plus an initial value. The acquire() method blocks if necessary until it can return without making the counter
    negative.

    If not given, value defaults to 1.

    This Semaphore's __exit__ method does not call the trace function.
    """

    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._links = []
        self.counter = value
        self._notifier = None
        # we don't want to do get_hub() here to allow module-level locks
        # without initializing the hub

    def __str__(self):
        params = (self.__class__.__name__, self.counter, len(self._links))
        return '<%s counter=%s _links[%s]>' % params

    def locked(self):
        return self.counter <= 0

    def release(self):
        self.counter += 1
        self._start_notify()

    def _start_notify(self):
        if self._links and self.counter > 0 and not self._notifier:
            self._notifier = get_hub().loop.run_callback(self._notify_links)

    def _notify_links(self):
        while True:
            self._dirty = False
            for link in self._links:
                if self.counter <= 0:
                    return
                try:
                    link(self)
                except:
                    getcurrent().handle_error((link, self), *sys.exc_info())
                if self._dirty:
                    break
            if not self._dirty:
                return

    def rawlink(self, callback):
        """Register a callback to call when a counter is more than zero.

        *callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
        *callback* will be passed one argument: this instance.
        """
        if not callable(callback):
            raise TypeError('Expected callable: %r' % (callback, ))
        self._links.append(callback)
        self._dirty = True

    def unlink(self, callback):
        """Remove the callback set by :meth:`rawlink`"""
        try:
            self._links.remove(callback)
            self._dirty = True
        except ValueError:
            pass

    def wait(self, timeout=None):
        if self.counter > 0:
            return self.counter
        else:
            switch = getcurrent().switch
            self.rawlink(switch)
            try:
                timer = Timeout.start_new(timeout)
                try:
                    try:
                        result = get_hub().switch()
                        assert result is self, 'Invalid switch into Semaphore.wait(): %r' % (result, )
                    except Timeout:
                        ex = sys.exc_info()[1]
                        if ex is not timer:
                            raise
                finally:
                    timer.cancel()
            finally:
                self.unlink(switch)
        return self.counter

    def acquire(self, blocking=True, timeout=None):
        if self.counter > 0:
            self.counter -= 1
            return True
        elif not blocking:
            return False
        else:
            switch = getcurrent().switch
            self.rawlink(switch)
            try:
                timer = Timeout.start_new(timeout)
                try:
                    try:
                        result = get_hub().switch()
                        assert result is self, 'Invalid switch into Semaphore.acquire(): %r' % (result, )
                    except Timeout:
                        ex = sys.exc_info()[1]
                        if ex is timer:
                            return False
                        raise
                finally:
                    timer.cancel()
            finally:
                self.unlink(switch)
            self.counter -= 1
            assert self.counter >= 0
            return True

    def __enter__(self):
        self.acquire()

    def __exit__(self, *args):
        self.release()