File: poll.py

package info (click to toggle)
python-eventlet 0.26.1-7%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,916 kB
  • sloc: python: 24,898; makefile: 98
file content (119 lines) | stat: -rw-r--r-- 4,021 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import errno
import sys

from eventlet import patcher, support
from eventlet.hubs import hub
select = patcher.original('select')
time = patcher.original('time')


def is_available():
    return hasattr(select, 'poll')


class Hub(hub.BaseHub):
    def __init__(self, clock=None):
        super(Hub, self).__init__(clock)
        self.EXC_MASK = select.POLLERR | select.POLLHUP
        self.READ_MASK = select.POLLIN | select.POLLPRI
        self.WRITE_MASK = select.POLLOUT
        self.poll = select.poll()

    def add(self, evtype, fileno, cb, tb, mac):
        listener = super(Hub, self).add(evtype, fileno, cb, tb, mac)
        self.register(fileno, new=True)
        return listener

    def remove(self, listener):
        super(Hub, self).remove(listener)
        self.register(listener.fileno)

    def register(self, fileno, new=False):
        mask = 0
        if self.listeners[self.READ].get(fileno):
            mask |= self.READ_MASK | self.EXC_MASK
        if self.listeners[self.WRITE].get(fileno):
            mask |= self.WRITE_MASK | self.EXC_MASK
        try:
            if mask:
                if new:
                    self.poll.register(fileno, mask)
                else:
                    try:
                        self.poll.modify(fileno, mask)
                    except (IOError, OSError):
                        self.poll.register(fileno, mask)
            else:
                try:
                    self.poll.unregister(fileno)
                except (KeyError, IOError, OSError):
                    # raised if we try to remove a fileno that was
                    # already removed/invalid
                    pass
        except ValueError:
            # fileno is bad, issue 74
            self.remove_descriptor(fileno)
            raise

    def remove_descriptor(self, fileno):
        super(Hub, self).remove_descriptor(fileno)
        try:
            self.poll.unregister(fileno)
        except (KeyError, ValueError, IOError, OSError):
            # raised if we try to remove a fileno that was
            # already removed/invalid
            pass

    def do_poll(self, seconds):
        # poll.poll expects integral milliseconds
        return self.poll.poll(int(seconds * 1000.0))

    def wait(self, seconds=None):
        readers = self.listeners[self.READ]
        writers = self.listeners[self.WRITE]

        if not readers and not writers:
            if seconds:
                time.sleep(seconds)
            return
        try:
            presult = self.do_poll(seconds)
        except (IOError, select.error) as e:
            if support.get_errno(e) == errno.EINTR:
                return
            raise
        SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS

        if self.debug_blocking:
            self.block_detect_pre()

        # Accumulate the listeners to call back to prior to
        # triggering any of them. This is to keep the set
        # of callbacks in sync with the events we've just
        # polled for. It prevents one handler from invalidating
        # another.
        callbacks = set()
        noop = hub.noop  # shave getattr
        for fileno, event in presult:
            if event & self.READ_MASK:
                callbacks.add((readers.get(fileno, noop), fileno))
            if event & self.WRITE_MASK:
                callbacks.add((writers.get(fileno, noop), fileno))
            if event & select.POLLNVAL:
                self.remove_descriptor(fileno)
                continue
            if event & self.EXC_MASK:
                callbacks.add((readers.get(fileno, noop), fileno))
                callbacks.add((writers.get(fileno, noop), fileno))

        for listener, fileno in callbacks:
            try:
                listener.cb(fileno)
            except SYSTEM_EXCEPTIONS:
                raise
            except:
                self.squelch_exception(fileno, sys.exc_info())
                support.clear_sys_exc_info()

        if self.debug_blocking:
            self.block_detect_post()