File: command.py

package info (click to toggle)
python-samsung-mdc 1.17.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 340 kB
  • sloc: python: 2,105; makefile: 2
file content (110 lines) | stat: -rw-r--r-- 3,531 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from typing import List, Union, Type
from functools import partial, partialmethod
from enum import Enum

from .fields import Field, Enum as EnumField
from .exceptions import MDCResponseError, NAKError


class CommandMcs(type):
    def __new__(mcs, name, bases, dict):
        if name.startswith('_') or name == 'Command':
            return type.__new__(mcs, name, bases, dict)

        if 'name' not in dict:
            dict['name'] = name.lower()

        if 'DATA' not in dict and bases:
            # allow naive DATA inheritance
            dict['DATA'] = bases[0].DATA
        if '__doc__' not in dict and bases and bases[0].__doc__:
            # doc is not inherited by default
            dict['__doc__'] = bases[0].__doc__

        dict['DATA'] = [
            # convert Enum to EnumField
            EnumField(x) if isinstance(x, type) and issubclass(x, Enum) else x
            for x in dict['DATA']
        ]
        dict['RESPONSE_DATA'] = [
            EnumField(x) if isinstance(x, type) and issubclass(x, Enum) else x
            for x in dict.get(
                'RESPONSE_DATA',
                dict['DATA'] + dict.get('RESPONSE_EXTRA', []))
        ]

        cls = type.__new__(mcs, name, bases, dict)

        if cls.GET:
            cls.__call__.__defaults__ = (b'',)
        if not cls.SET or not cls.DATA:
            cls.__call__ = partialmethod(cls.__call__, data=b'')

        return cls


class Command(metaclass=CommandMcs):
    name: str
    CMD: Union[int, Field]
    SUBCMD: Union[int, None] = None
    GET: bool
    SET: bool
    DATA: List[Union[Type[Enum], Field]]
    RESPONSE_DATA: List[Union[Type[Enum], Field]]
    RESPONSE_EXTRA: List[Union[Type[Enum], Field]]

    async def __call__(self, connection, display_id, data):
        data = self.parse_response(
            await connection.send(
                (self.CMD, self.SUBCMD)
                if self.SUBCMD is not None else self.CMD, display_id,
                self.pack_payload_data(data) if data else []
            ),
        )
        return tuple(self.parse_response_data(data))

    def __get__(self, connection, cls):
        # Allow Command to be bounded as instance method
        if connection is None:
            return self  # bind to class
        return partial(self, connection)  # bind to instance

    @staticmethod
    def parse_response(response):
        ack, rcmd, data = response
        if not ack:
            raise NAKError(data[0])
        return data

    @classmethod
    def parse_response_data(cls, data, strict_enum=True):
        rv, cursor = [], 0
        for field in cls.RESPONSE_DATA:
            try:
                value, cursor_shift = field.parse(data[cursor:])
            except Exception as exc:
                raise MDCResponseError(
                    f'Error parsing {field.name}: {exc}',
                    data[cursor:]) from exc

            rv.append(value)
            cursor += cursor_shift

        if data[cursor:]:
            # Not consumed data left
            raise MDCResponseError('Unparsed data left', data[cursor:])
        return tuple(rv)

    @classmethod
    def pack_payload_data(cls, data):
        rv = bytes()
        for i, field in enumerate(cls.DATA):
            rv += bytes(field.pack(data[i]))
        if cls.DATA and len(data[i+1:]):
            raise ValueError('Unpacked data left '
                             '(more data provided than needed)')
        return rv

    @classmethod
    def get_order(cls):
        return (cls.CMD, cls.SUBCMD)