File: jsonrpc.py

package info (click to toggle)
python-jsonrpc-async 2.1.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 128 kB
  • sloc: python: 316; makefile: 4
file content (58 lines) | stat: -rw-r--r-- 2,080 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio
import functools

import aiohttp
import jsonrpc_base
from jsonrpc_base import TransportError


class Server(jsonrpc_base.Server):
    """A connection to a HTTP JSON-RPC server, backed by aiohttp"""

    def __init__(self, url, session=None, *, loads=None, **post_kwargs):
        super().__init__()
        object.__setattr__(self, 'session', session or aiohttp.ClientSession())
        post_kwargs['headers'] = post_kwargs.get('headers', {})
        post_kwargs['headers']['Content-Type'] = post_kwargs['headers'].get(
            'Content-Type', 'application/json')
        post_kwargs['headers']['Accept'] = post_kwargs['headers'].get(
            'Accept', 'application/json-rpc')
        self._request = functools.partial(
            self.session.post, url, **post_kwargs)

        self._json_args = {}
        if loads is not None:
            self._json_args['loads'] = loads

    async def send_message(self, message):
        """Send the HTTP message to the server and return the message response.

        No result is returned if message is a notification.
        """
        try:
            response = await self._request(data=message.serialize())
        except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
            raise TransportError('Transport Error', message, exc)

        if response.status != 200:
            raise TransportError(
                'HTTP %d %s' % (response.status, response.reason), message)

        if message.response_id is None:
            # Message is notification, so no response is expcted.
            return None

        try:
            response_data = await response.json(**self._json_args)
        except ValueError as value_error:
            raise TransportError(
                'Cannot deserialize response body', message, value_error)

        return message.parse_response(response_data)

    async def __aenter__(self):
        await self.session.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        return await self.session.__aexit__(exc_type, exc, tb)