File: async_gevent_uwsgi.py

package info (click to toggle)
python-engineio 3.0.0%2Bdfsg-1
  • links: PTS
  • area: main
  • in suites: buster
  • size: 468 kB
  • sloc: python: 4,688; makefile: 15
file content (155 lines) | stat: -rw-r--r-- 5,469 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import importlib
import sys
import six

import gevent
import uwsgi
_websocket_available = hasattr(uwsgi, 'websocket_handshake')


class Thread(gevent.Greenlet):  # pragma: no cover
    """
    This wrapper class provides gevent Greenlet interface that is compatible
    with the standard library's Thread class.
    """
    def __init__(self, target, args=[], kwargs={}):
        super(Thread, self).__init__(target, *args, **kwargs)

    def _run(self):
        return self.run()


class uWSGIWebSocket(object):  # pragma: no cover
    """
    This wrapper class provides a uWSGI WebSocket interface that is
    compatible with eventlet's implementation.
    """
    def __init__(self, app):
        self.app = app
        self._sock = None

    def __call__(self, environ, start_response):
        self._sock = uwsgi.connection_fd()
        self.environ = environ

        uwsgi.websocket_handshake()

        self._req_ctx = None
        if hasattr(uwsgi, 'request_context'):
            # uWSGI >= 2.1.x with support for api access across-greenlets
            self._req_ctx = uwsgi.request_context()
        else:
            # use event and queue for sending messages
            from gevent.event import Event
            from gevent.queue import Queue
            from gevent.select import select
            self._event = Event()
            self._send_queue = Queue()

            # spawn a select greenlet
            def select_greenlet_runner(fd, event):
                """Sets event when data becomes available to read on fd."""
                while True:
                    event.set()
                    try:
                        select([fd], [], [])[0]
                    except ValueError:
                        break
            self._select_greenlet = gevent.spawn(
                select_greenlet_runner,
                self._sock,
                self._event)

        self.app(self)

    def close(self):
        """Disconnects uWSGI from the client."""
        uwsgi.disconnect()
        if self._req_ctx is None:
            # better kill it here in case wait() is not called again
            self._select_greenlet.kill()
            self._event.set()

    def _send(self, msg):
        """Transmits message either in binary or UTF-8 text mode,
        depending on its type."""
        if isinstance(msg, six.binary_type):
            method = uwsgi.websocket_send_binary
        else:
            method = uwsgi.websocket_send
        if self._req_ctx is not None:
            method(msg, request_context=self._req_ctx)
        else:
            method(msg)

    def _decode_received(self, msg):
        """Returns either bytes or str, depending on message type."""
        if not isinstance(msg, six.binary_type):
            # already decoded - do nothing
            return msg
        # only decode from utf-8 if message is not binary data
        type = six.byte2int(msg[0:1])
        if type >= 48:  # no binary
            return msg.decode('utf-8')
        # binary message, don't try to decode
        return msg

    def send(self, msg):
        """Queues a message for sending. Real transmission is done in
        wait method.
        Sends directly if uWSGI version is new enough."""
        if self._req_ctx is not None:
            self._send(msg)
        else:
            self._send_queue.put(msg)
            self._event.set()

    def wait(self):
        """Waits and returns received messages.
        If running in compatibility mode for older uWSGI versions,
        it also sends messages that have been queued by send().
        A return value of None means that connection was closed.
        This must be called repeatedly. For uWSGI < 2.1.x it must
        be called from the main greenlet."""
        while True:
            if self._req_ctx is not None:
                try:
                    msg = uwsgi.websocket_recv(request_context=self._req_ctx)
                except IOError:  # connection closed
                    return None
                return self._decode_received(msg)
            else:
                # we wake up at least every 3 seconds to let uWSGI
                # do its ping/ponging
                event_set = self._event.wait(timeout=3)
                if event_set:
                    self._event.clear()
                    # maybe there is something to send
                    msgs = []
                    while True:
                        try:
                            msgs.append(self._send_queue.get(block=False))
                        except gevent.queue.Empty:
                            break
                    for msg in msgs:
                        self._send(msg)
                # maybe there is something to receive, if not, at least
                # ensure uWSGI does its ping/ponging
                try:
                    msg = uwsgi.websocket_recv_nb()
                except IOError:  # connection closed
                    self._select_greenlet.kill()
                    return None
                if msg:  # message available
                    return self._decode_received(msg)


_async = {
    'threading': sys.modules[__name__],
    'thread_class': 'Thread',
    'queue': importlib.import_module('gevent.queue'),
    'queue_class': 'JoinableQueue',
    'websocket': sys.modules[__name__] if _websocket_available else None,
    'websocket_class': 'uWSGIWebSocket' if _websocket_available else None,
    'sleep': gevent.sleep
}