1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
|
"""WebSocket client for asyncio."""
import asyncio
import aiohttp
from .websocket import Message
from .websocket import WebSocketError
from .websocket import MSG_BINARY, MSG_TEXT, MSG_CLOSE, MSG_PING, MSG_PONG
__all__ = ('ws_connect', 'MsgType')
try:
from enum import IntEnum
except ImportError: # pragma: no cover
IntEnum = object
class MsgType(IntEnum):
text = MSG_TEXT
binary = MSG_BINARY
ping = MSG_PING
pong = MSG_PONG
close = MSG_CLOSE
closed = 20
error = 21
closedMessage = Message(MsgType.closed, None, None)
class ClientWebSocketResponse:
def __init__(self, reader, writer, protocol,
response, timeout, autoclose, autoping, loop):
self._response = response
self._conn = response.connection
self._writer = writer
self._reader = reader
self._protocol = protocol
self._closed = False
self._closing = False
self._close_code = None
self._timeout = timeout
self._autoclose = autoclose
self._autoping = autoping
self._loop = loop
self._waiting = False
self._exception = None
@property
def closed(self):
return self._closed
@property
def close_code(self):
return self._close_code
@property
def protocol(self):
return self._protocol
def exception(self):
return self._exception
def ping(self, message='b'):
if self._closed:
raise RuntimeError('websocket connection is closed')
self._writer.ping(message)
def pong(self, message='b'):
if self._closed:
raise RuntimeError('websocket connection is closed')
self._writer.pong(message)
def send_str(self, data):
if self._closed:
raise RuntimeError('websocket connection is closed')
if not isinstance(data, str):
raise TypeError('data argument must be str (%r)' % type(data))
self._writer.send(data, binary=False)
def send_bytes(self, data):
if self._closed:
raise RuntimeError('websocket connection is closed')
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)' %
type(data))
self._writer.send(data, binary=True)
@asyncio.coroutine
def close(self, *, code=1000, message=b''):
if not self._closed:
self._closed = True
try:
self._writer.close(code, message)
except asyncio.CancelledError:
self._close_code = 1006
self._response.close(force=True)
raise
except Exception as exc:
self._close_code = 1006
self._exception = exc
self._response.close(force=True)
return True
if self._closing:
self._response.close(force=True)
return True
while True:
try:
msg = yield from asyncio.wait_for(
self._reader.read(), self._timeout, loop=self._loop)
except asyncio.CancelledError:
self._close_code = 1006
self._response.close(force=True)
raise
except Exception as exc:
self._close_code = 1006
self._exception = exc
self._response.close(force=True)
return True
if msg.tp == MsgType.close:
self._close_code = msg.data
self._response.close(force=True)
return True
else:
return False
@asyncio.coroutine
def receive(self):
if self._waiting:
raise RuntimeError('Concurrent call to receive() is not allowed')
self._waiting = True
try:
while True:
if self._closed:
return closedMessage
try:
msg = yield from self._reader.read()
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except WebSocketError as exc:
self._close_code = exc.code
yield from self.close(code=exc.code)
return Message(MsgType.error, exc, None)
except Exception as exc:
self._exception = exc
self._closing = True
self._close_code = 1006
yield from self.close()
return Message(MsgType.error, exc, None)
if msg.tp == MsgType.close:
self._closing = True
self._close_code = msg.data
if not self._closed and self._autoclose:
yield from self.close()
return msg
elif not self._closed:
if msg.tp == MsgType.ping and self._autoping:
self._writer.pong(msg.data)
elif msg.tp == MsgType.pong and self._autoping:
continue
else:
return msg
finally:
self._waiting = False
@asyncio.coroutine
def ws_connect(url, *, protocols=(), timeout=10.0, connector=None,
ws_response_class=ClientWebSocketResponse, autoclose=True,
autoping=True, loop=None):
if loop is None:
asyncio.get_event_loop()
if connector is None:
connector = aiohttp.TCPConnector(loop=loop, force_close=True)
session = aiohttp.ClientSession(loop=loop, connector=connector,
ws_response_class=ws_response_class)
try:
resp = yield from session.ws_connect(
url,
protocols=protocols,
timeout=timeout,
autoclose=autoclose,
autoping=autoping)
return resp
finally:
session.detach()
|