1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
"""Snapcast protocol."""
import asyncio
import json
import random
SERVER_ONDISCONNECT = 'Server.OnDisconnect'
# pylint: disable=consider-using-f-string
def jsonrpc_request(method, identifier, params=None):
"""Produce a JSONRPC request."""
return '{}\r\n'.format(json.dumps({
'id': identifier,
'method': method,
'params': params or {},
'jsonrpc': '2.0'
})).encode()
class SnapcastProtocol(asyncio.Protocol):
"""Async Snapcast protocol."""
def __init__(self, callbacks):
"""Initialize."""
self._transport = None
self._buffer = {}
self._callbacks = callbacks
self._data_buffer = ''
def connection_made(self, transport):
"""When a connection is made."""
self._transport = transport
def connection_lost(self, exc):
"""When a connection is lost."""
for b in self._buffer.values():
b['error'] = {"code": -1, "message": "connection lost"}
b['flag'].set()
self._callbacks.get(SERVER_ONDISCONNECT)(exc)
def data_received(self, data):
"""Handle received data."""
self._data_buffer += data.decode()
if not self._data_buffer.endswith('\r\n'):
return
data = self._data_buffer
self._data_buffer = '' # clear buffer
for cmd in data.strip().split('\r\n'):
data = json.loads(cmd)
if not isinstance(data, list):
data = [data]
for item in data:
self.handle_data(item)
def handle_data(self, data):
"""Handle JSONRPC data."""
if 'id' in data:
self.handle_response(data)
else:
self.handle_notification(data)
def handle_response(self, data):
"""Handle JSONRPC response."""
identifier = data.get('id')
self._buffer[identifier]['data'] = data.get('result')
self._buffer[identifier]['error'] = data.get('error')
self._buffer[identifier]['flag'].set()
def handle_notification(self, data):
"""Handle JSONRPC notification."""
if data.get('method') in self._callbacks:
self._callbacks.get(data.get('method'))(data.get('params'))
async def request(self, method, params):
"""Send a JSONRPC request."""
identifier = random.randint(1, 1000)
self._transport.write(jsonrpc_request(method, identifier, params))
self._buffer[identifier] = {'flag': asyncio.Event()}
await self._buffer[identifier]['flag'].wait()
result = self._buffer[identifier].get('data')
error = self._buffer[identifier].get('error')
self._buffer[identifier].clear()
del self._buffer[identifier]
return (result, error)
|