1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
from typing import List, Union, Type
from functools import partial, partialmethod
from enum import Enum
from .fields import Field, Enum as EnumField
from .exceptions import MDCResponseError, NAKError
class CommandMcs(type):
def __new__(mcs, name, bases, dict):
if name.startswith('_') or name == 'Command':
return type.__new__(mcs, name, bases, dict)
if 'name' not in dict:
dict['name'] = name.lower()
if 'DATA' not in dict and bases:
# allow naive DATA inheritance
dict['DATA'] = bases[0].DATA
if '__doc__' not in dict and bases and bases[0].__doc__:
# doc is not inherited by default
dict['__doc__'] = bases[0].__doc__
dict['DATA'] = [
# convert Enum to EnumField
EnumField(x) if isinstance(x, type) and issubclass(x, Enum) else x
for x in dict['DATA']
]
dict['RESPONSE_DATA'] = [
EnumField(x) if isinstance(x, type) and issubclass(x, Enum) else x
for x in dict.get(
'RESPONSE_DATA',
dict['DATA'] + dict.get('RESPONSE_EXTRA', []))
]
cls = type.__new__(mcs, name, bases, dict)
if cls.GET:
cls.__call__.__defaults__ = (b'',)
if not cls.SET or not cls.DATA:
cls.__call__ = partialmethod(cls.__call__, data=b'')
return cls
class Command(metaclass=CommandMcs):
name: str
CMD: Union[int, Field]
SUBCMD: Union[int, None] = None
GET: bool
SET: bool
DATA: List[Union[Type[Enum], Field]]
RESPONSE_DATA: List[Union[Type[Enum], Field]]
RESPONSE_EXTRA: List[Union[Type[Enum], Field]]
async def __call__(self, connection, display_id, data):
data = self.parse_response(
await connection.send(
(self.CMD, self.SUBCMD)
if self.SUBCMD is not None else self.CMD, display_id,
self.pack_payload_data(data) if data else []
),
)
return tuple(self.parse_response_data(data))
def __get__(self, connection, cls):
# Allow Command to be bounded as instance method
if connection is None:
return self # bind to class
return partial(self, connection) # bind to instance
@staticmethod
def parse_response(response):
ack, rcmd, data = response
if not ack:
raise NAKError(data[0])
return data
@classmethod
def parse_response_data(cls, data, strict_enum=True):
rv, cursor = [], 0
for field in cls.RESPONSE_DATA:
try:
value, cursor_shift = field.parse(data[cursor:])
except Exception as exc:
raise MDCResponseError(
f'Error parsing {field.name}: {exc}',
data[cursor:]) from exc
rv.append(value)
cursor += cursor_shift
if data[cursor:]:
# Not consumed data left
raise MDCResponseError('Unparsed data left', data[cursor:])
return tuple(rv)
@classmethod
def pack_payload_data(cls, data):
rv = bytes()
for i, field in enumerate(cls.DATA):
rv += bytes(field.pack(data[i]))
if cls.DATA and len(data[i+1:]):
raise ValueError('Unpacked data left '
'(more data provided than needed)')
return rv
@classmethod
def get_order(cls):
return (cls.CMD, cls.SUBCMD)
|