File: protocol.py

package info (click to toggle)
python-snapcast 2.3.7-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 184 kB
  • sloc: python: 1,564; makefile: 9
file content (85 lines) | stat: -rw-r--r-- 2,772 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""Snapcast protocol."""

import asyncio
import json
import random

SERVER_ONDISCONNECT = 'Server.OnDisconnect'


# pylint: disable=consider-using-f-string
def jsonrpc_request(method, identifier, params=None):
    """Produce a JSONRPC request."""
    return '{}\r\n'.format(json.dumps({
        'id': identifier,
        'method': method,
        'params': params or {},
        'jsonrpc': '2.0'
    })).encode()


class SnapcastProtocol(asyncio.Protocol):
    """Async Snapcast protocol."""

    def __init__(self, callbacks):
        """Initialize."""
        self._transport = None
        self._buffer = {}
        self._callbacks = callbacks
        self._data_buffer = ''

    def connection_made(self, transport):
        """When a connection is made."""
        self._transport = transport

    def connection_lost(self, exc):
        """When a connection is lost."""
        for b in self._buffer.values():
            b['error'] = {"code": -1, "message": "connection lost"}
            b['flag'].set()
        self._callbacks.get(SERVER_ONDISCONNECT)(exc)

    def data_received(self, data):
        """Handle received data."""
        self._data_buffer += data.decode()
        if not self._data_buffer.endswith('\r\n'):
            return
        data = self._data_buffer
        self._data_buffer = ''  # clear buffer
        for cmd in data.strip().split('\r\n'):
            data = json.loads(cmd)
            if not isinstance(data, list):
                data = [data]
            for item in data:
                self.handle_data(item)

    def handle_data(self, data):
        """Handle JSONRPC data."""
        if 'id' in data:
            self.handle_response(data)
        else:
            self.handle_notification(data)

    def handle_response(self, data):
        """Handle JSONRPC response."""
        identifier = data.get('id')
        self._buffer[identifier]['data'] = data.get('result')
        self._buffer[identifier]['error'] = data.get('error')
        self._buffer[identifier]['flag'].set()

    def handle_notification(self, data):
        """Handle JSONRPC notification."""
        if data.get('method') in self._callbacks:
            self._callbacks.get(data.get('method'))(data.get('params'))

    async def request(self, method, params):
        """Send a JSONRPC request."""
        identifier = random.randint(1, 1000)
        self._transport.write(jsonrpc_request(method, identifier, params))
        self._buffer[identifier] = {'flag': asyncio.Event()}
        await self._buffer[identifier]['flag'].wait()
        result = self._buffer[identifier].get('data')
        error = self._buffer[identifier].get('error')
        self._buffer[identifier].clear()
        del self._buffer[identifier]
        return (result, error)