File: transports.py

package info (click to toggle)
python-socketio-client 0.7.2-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 212 kB
  • sloc: python: 1,250; javascript: 142; makefile: 8
file content (208 lines) | stat: -rw-r--r-- 7,956 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import requests
import six
import ssl
import threading
import time
from six.moves.urllib.parse import urlencode as format_query
from six.moves.urllib.parse import urlparse as parse_url
from socket import error as SocketError
try:
    from websocket import (
        WebSocketConnectionClosedException, WebSocketTimeoutException,
        create_connection)
except ImportError:
    exit("""\
An incompatible websocket library is conflicting with the one we need.
You can remove the incompatible library and install the correct one
by running the following commands:

yes | pip uninstall websocket websocket-client
pip install -U websocket-client""")

from .exceptions import ConnectionError, TimeoutError
from .parsers import (
    encode_engineIO_content, decode_engineIO_content,
    format_packet_text, parse_packet_text)
from .symmetries import SSLError, memoryview


ENGINEIO_PROTOCOL = 3
TRANSPORTS = 'xhr-polling', 'websocket'


class AbstractTransport(object):

    def __init__(self, http_session, is_secure, url, engineIO_session=None):
        self.http_session = http_session
        self.is_secure = is_secure
        self.url = url
        self.engineIO_session = engineIO_session

    def recv_packet(self):
        pass

    def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
        pass

    def set_timeout(self, seconds=None):
        pass


class XHR_PollingTransport(AbstractTransport):

    def __init__(self, http_session, is_secure, url, engineIO_session=None):
        super(XHR_PollingTransport, self).__init__(
            http_session, is_secure, url, engineIO_session)
        self._params = {
            'EIO': ENGINEIO_PROTOCOL, 'transport': 'polling'}
        if engineIO_session:
            self._request_index = 1
            self._kw_get = dict(
                timeout=engineIO_session.ping_timeout)
            self._kw_post = dict(
                timeout=engineIO_session.ping_timeout,
                headers={'content-type': 'application/octet-stream'})
            self._params['sid'] = engineIO_session.id
        else:
            self._request_index = 0
            self._kw_get = {}
            self._kw_post = {}
        http_scheme = 'https' if is_secure else 'http'
        self._http_url = '%s://%s/' % (http_scheme, url)
        self._request_index_lock = threading.Lock()
        self._send_packet_lock = threading.Lock()

    def recv_packet(self):
        params = dict(self._params)
        params['t'] = self._get_timestamp()
        response = get_response(
            self.http_session.get,
            self._http_url,
            params=params,
            **self._kw_get)
        for engineIO_packet in decode_engineIO_content(response.content):
            engineIO_packet_type, engineIO_packet_data = engineIO_packet
            yield engineIO_packet_type, engineIO_packet_data

    def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
        with self._send_packet_lock:
            params = dict(self._params)
            params['t'] = self._get_timestamp()
            data = encode_engineIO_content([
                (engineIO_packet_type, engineIO_packet_data),
            ])
            get_response(
                self.http_session.post,
                self._http_url,
                params=params,
                data=memoryview(data),
                **self._kw_post)

    def _get_timestamp(self):
        with self._request_index_lock:
            timestamp = '%s-%s' % (
                int(time.time() * 1000), self._request_index)
            self._request_index += 1
        return timestamp


class WebsocketTransport(AbstractTransport):

    def __init__(self, http_session, is_secure, url, engineIO_session=None):
        super(WebsocketTransport, self).__init__(
            http_session, is_secure, url, engineIO_session)
        params = dict(http_session.params, **{
            'EIO': ENGINEIO_PROTOCOL, 'transport': 'websocket'})
        request = http_session.prepare_request(requests.Request('GET', url))
        kw = {'header': ['%s: %s' % x for x in request.headers.items()]}
        if engineIO_session:
            params['sid'] = engineIO_session.id
            kw['timeout'] = self._timeout = engineIO_session.ping_timeout
        ws_url = '%s://%s/?%s' % (
            'wss' if is_secure else 'ws', url, format_query(params))
        http_scheme = 'https' if is_secure else 'http'
        if http_scheme in http_session.proxies:  # Use the correct proxy
            proxy_url_pack = parse_url(http_session.proxies[http_scheme])
            kw['http_proxy_host'] = proxy_url_pack.hostname
            kw['http_proxy_port'] = proxy_url_pack.port
            if proxy_url_pack.username:
                kw['http_proxy_auth'] = (
                    proxy_url_pack.username, proxy_url_pack.password)
        if http_session.verify:
            if http_session.cert:  # Specify certificate path on disk
                if isinstance(http_session.cert, six.string_types):
                    kw['ca_certs'] = http_session.cert
                else:
                    kw['ca_certs'] = http_session.cert[0]
        else:  # Do not verify the SSL certificate
            kw['sslopt'] = {'cert_reqs': ssl.CERT_NONE}
        try:
            self._connection = create_connection(ws_url, **kw)
        except Exception as e:
            raise ConnectionError(e)

    def recv_packet(self):
        try:
            packet_text = self._connection.recv()
        except WebSocketTimeoutException as e:
            raise TimeoutError('recv timed out (%s)' % e)
        except SSLError as e:
            raise ConnectionError('recv disconnected by SSL (%s)' % e)
        except WebSocketConnectionClosedException as e:
            raise ConnectionError('recv disconnected (%s)' % e)
        except SocketError as e:
            raise ConnectionError('recv disconnected (%s)' % e)
        if not isinstance(packet_text, six.binary_type):
            packet_text = packet_text.encode('utf-8')
        engineIO_packet_type, engineIO_packet_data = parse_packet_text(
            packet_text)
        yield engineIO_packet_type, engineIO_packet_data

    def send_packet(self, engineIO_packet_type, engineIO_packet_data=''):
        packet = format_packet_text(engineIO_packet_type, engineIO_packet_data)
        try:
            self._connection.send(packet)
        except WebSocketTimeoutException as e:
            raise TimeoutError('send timed out (%s)' % e)
        except (SocketError, WebSocketConnectionClosedException) as e:
            raise ConnectionError('send disconnected (%s)' % e)

    def set_timeout(self, seconds=None):
        self._connection.settimeout(seconds or self._timeout)


def get_response(request, *args, **kw):
    try:
        response = request(*args, stream=True, **kw)
    except requests.exceptions.Timeout as e:
        raise TimeoutError(e)
    except requests.exceptions.ConnectionError as e:
        raise ConnectionError(e)
    except requests.exceptions.SSLError as e:
        raise ConnectionError('could not negotiate SSL (%s)' % e)
    status_code = response.status_code
    if 200 != status_code:
        raise ConnectionError('unexpected status code (%s %s)' % (
            status_code, response.text))
    return response


def prepare_http_session(kw):
    http_session = requests.Session()
    http_session.headers.update(kw.get('headers', {}))
    http_session.auth = kw.get('auth')
    http_session.proxies.update(kw.get('proxies', {}))
    http_session.hooks.update(kw.get('hooks', {}))
    http_session.params.update(kw.get('params', {}))
    http_session.verify = kw.get('verify', True)
    http_session.cert = _get_cert(kw)
    http_session.cookies.update(kw.get('cookies', {}))
    return http_session


def _get_cert(kw):
    # Reduce (None, None) to None
    cert = kw.get('cert')
    if hasattr(cert, '__iter__') and cert[0] is None:
        cert = None
    return cert