1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
# -*- coding: utf-8 -*-
import ssl
from tornado import iostream, escape
from ws4py.client import WebSocketBaseClient
from ws4py.exc import HandshakeError
__all__ = ['TornadoWebSocketClient']
class TornadoWebSocketClient(WebSocketBaseClient):
def __init__(self, url, protocols=None, extensions=None,
io_loop=None, ssl_options=None, headers=None):
"""
.. code-block:: python
from tornado import ioloop
class MyClient(TornadoWebSocketClient):
def opened(self):
for i in range(0, 200, 25):
self.send("*" * i)
def received_message(self, m):
print((m, len(str(m))))
def closed(self, code, reason=None):
ioloop.IOLoop.instance().stop()
ws = MyClient('ws://localhost:9000/echo', protocols=['http-only', 'chat'])
ws.connect()
ioloop.IOLoop.instance().start()
"""
WebSocketBaseClient.__init__(self, url, protocols, extensions,
ssl_options=ssl_options, headers=headers)
if self.scheme == "wss":
self.sock = ssl.wrap_socket(self.sock, do_handshake_on_connect=False, **self.ssl_options)
self._is_secure = True
self.io = iostream.SSLIOStream(self.sock, io_loop, ssl_options=self.ssl_options)
else:
self.io = iostream.IOStream(self.sock, io_loop)
self.io_loop = io_loop
def connect(self):
"""
Connects the websocket and initiate the upgrade handshake.
"""
self.io.set_close_callback(self.__connection_refused)
self.io.connect((self.host, int(self.port)), self.__send_handshake)
def _write(self, b):
"""
Trying to prevent a write operation
on an already closed websocket stream.
This cannot be bullet proof but hopefully
will catch almost all use cases.
"""
if self.terminated:
raise RuntimeError("Cannot send on a terminated websocket")
self.io.write(b)
def __connection_refused(self, *args, **kwargs):
self.server_terminated = True
self.closed(1005, 'Connection refused')
def __send_handshake(self):
self.io.set_close_callback(self.__connection_closed)
self.io.write(escape.utf8(self.handshake_request),
self.__handshake_sent)
def __connection_closed(self, *args, **kwargs):
self.server_terminated = True
self.closed(1006, 'Connection closed during handshake')
def __handshake_sent(self):
self.io.read_until(b"\r\n\r\n", self.__handshake_completed)
def __handshake_completed(self, data):
self.io.set_close_callback(None)
try:
response_line, _, headers = data.partition(b'\r\n')
self.process_response_line(response_line)
protocols, extensions = self.process_handshake_header(headers)
except HandshakeError:
self.close_connection()
raise
self.opened()
self.io.set_close_callback(self.__stream_closed)
self.io.read_bytes(self.reading_buffer_size, self.__fetch_more)
def __fetch_more(self, bytes):
try:
should_continue = self.process(bytes)
except:
should_continue = False
if should_continue:
self.io.read_bytes(self.reading_buffer_size, self.__fetch_more)
else:
self.__gracefully_terminate()
def __gracefully_terminate(self):
self.client_terminated = self.server_terminated = True
try:
if not self.stream.closing:
self.closed(1006)
finally:
self.close_connection()
def __stream_closed(self, *args, **kwargs):
self.io.set_close_callback(None)
code = 1006
reason = None
if self.stream.closing:
code, reason = self.stream.closing.code, self.stream.closing.reason
self.closed(code, reason)
self.stream._cleanup()
def close_connection(self):
"""
Close the underlying connection
"""
self.io.close()
if __name__ == '__main__':
from tornado import ioloop
class MyClient(TornadoWebSocketClient):
def opened(self):
def data_provider():
for i in range(0, 200, 25):
yield "#" * i
self.send(data_provider())
for i in range(0, 200, 25):
self.send("*" * i)
def received_message(self, m):
print("#%d" % len(m))
if len(m) == 175:
self.close()
def closed(self, code, reason=None):
ioloop.IOLoop.instance().stop()
print(("Closed down", code, reason))
ws = MyClient('ws://localhost:9000/ws', protocols=['http-only', 'chat'])
ws.connect()
ioloop.IOLoop.instance().start()
|