1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
import asyncio
import json
import aiohttp
from aiohttp import ClientError
from aiohttp.http_exceptions import HttpProcessingError
import async_timeout
import jsonrpc_base
from jsonrpc_base import TransportError
class Server(jsonrpc_base.Server):
"""A connection to a HTTP JSON-RPC server, backed by aiohttp"""
def __init__(self, url, session=None, **connect_kwargs):
super().__init__()
self._session = session or aiohttp.ClientSession()
# True if we made our own session
self._internal_session = session is None
self._client = None
self._connect_kwargs = connect_kwargs
self._url = url
self._connect_kwargs['headers'] = self._connect_kwargs.get(
'headers', {})
self._connect_kwargs['headers']['Content-Type'] = (
self._connect_kwargs['headers'].get(
'Content-Type', 'application/json'))
self._connect_kwargs['headers']['Accept'] = (
self._connect_kwargs['headers'].get(
'Accept', 'application/json-rpc'))
self._timeout = self._connect_kwargs.get('timeout')
self._pending_messages = {}
async def send_message(self, message):
"""Send the HTTP message to the server and return the message response.
No result is returned if message is a notification.
"""
if self._client is None:
raise TransportError('Client is not connected.', message)
try:
await self._client.send_str(message.serialize())
if message.response_id:
pending_message = PendingMessage()
self._pending_messages[message.response_id] = pending_message
response = await pending_message.wait(self._timeout)
del self._pending_messages[message.response_id]
else:
response = None
return message.parse_response(response)
except (ClientError, HttpProcessingError, asyncio.TimeoutError) as exc:
raise TransportError('Transport Error', message, exc)
async def ws_connect(self):
"""Connect to the websocket server."""
if self.connected:
raise TransportError('Connection already open.')
try:
if self._internal_session and self._session.closed:
self._session = aiohttp.ClientSession()
self._client = await self._session.ws_connect(
self._url, **self._connect_kwargs)
except (ClientError, HttpProcessingError, asyncio.TimeoutError) as exc:
raise TransportError('Error connecting to server', None, exc)
return self._session.loop.create_task(self._ws_loop())
async def _ws_loop(self):
"""Listen for messages from the websocket server."""
msg = None
try:
async for msg in self._client:
if msg.type == aiohttp.WSMsgType.ERROR:
break
elif msg.type == aiohttp.WSMsgType.BINARY:
try:
# If we get a binary message, try and decode it as a
# UTF-8 JSON string, in case the server is sending
# binary websocket messages. If it doens't decode we'll
# ignore it since we weren't expecting binary messages
# anyway
data = json.loads(msg.data.decode())
except ValueError:
continue
elif msg.type == aiohttp.WSMsgType.TEXT:
try:
data = msg.json()
except ValueError as exc:
raise TransportError('Error Parsing JSON', None, exc)
else:
# This is tested with test_message_ping_ignored, but
# cpython's optimizations prevent coveragepy from detecting
# that it's run
# https://bitbucket.org/ned/coveragepy/issues/198/continue-marked-as-not-covered
continue # pragma: no cover
if 'method' in data:
request = jsonrpc_base.Request.parse(data)
response = await self.async_receive_request(request)
if response:
await self.send_message(response)
else:
self._pending_messages[data['id']].response = data
except (ClientError, HttpProcessingError, asyncio.TimeoutError) as exc:
raise TransportError('Transport Error', None, exc)
finally:
await self.close()
if msg and msg.type == aiohttp.WSMsgType.ERROR:
raise TransportError(
'Websocket error detected. Connection closed.')
async def close(self):
"""Close the connection to the websocket server."""
if self.connected:
await self._client.close()
self._client = None
if self._internal_session:
await self._session.close()
@property
def connected(self):
"""Websocket server is connected."""
return self._client is not None
class PendingMessage(object):
"""Wait for response of pending message."""
def __init__(self):
self._event = asyncio.Event()
self._response = None
async def wait(self, timeout=None):
async with async_timeout.timeout(timeout):
await self._event.wait()
return self._response
@property
def response(self):
return self._response
@response.setter
def response(self, value):
self._response = value
self._event.set()
|