File: __init__.py

package info (click to toggle)
mopidy-mpd 3.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 704 kB
  • sloc: python: 7,640; makefile: 3
file content (216 lines) | stat: -rw-r--r-- 7,071 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
"""
This is Mopidy's MPD protocol implementation.

This is partly based upon the `MPD protocol documentation
<http://www.musicpd.org/doc/protocol/>`_, which is a useful resource, but it is
rather incomplete with regards to data formats, both for requests and
responses. Thus, we have had to talk a great deal with the the original `MPD
server <https://mpd.fandom.com/>`_ using telnet to get the details we need to
implement our own MPD server which is compatible with the numerous existing
`MPD clients <https://mpd.fandom.com/wiki/Clients>`_.
"""

import inspect

from mopidy_mpd import exceptions

#: The MPD protocol uses UTF-8 for encoding all data.
ENCODING = "UTF-8"

#: The MPD protocol uses ``\n`` as line terminator.
LINE_TERMINATOR = b"\n"

#: The MPD protocol version is 0.19.0.
VERSION = "0.19.0"


def load_protocol_modules():
    """
    The protocol modules must be imported to get them registered in
    :attr:`commands`.
    """
    from . import (  # noqa
        audio_output,
        channels,
        command_list,
        connection,
        current_playlist,
        mount,
        music_db,
        playback,
        reflection,
        status,
        stickers,
        stored_playlists,
    )


def INT(value):  # noqa: N802
    r"""Converts a value that matches [+-]?\d+ into an integer."""
    if value is None:
        raise ValueError("None is not a valid integer")
    # TODO: check for whitespace via value != value.strip()?
    return int(value)


def UINT(value):  # noqa: N802
    r"""Converts a value that matches \d+ into an integer."""
    if value is None:
        raise ValueError("None is not a valid integer")
    if not value.isdigit():
        raise ValueError("Only positive numbers are allowed")
    return int(value)


def FLOAT(value):  # noqa: N802
    r"""Converts a value that matches [+-]\d+(.\d+)? into a float."""
    if value is None:
        raise ValueError("None is not a valid float")
    return float(value)


def UFLOAT(value):  # noqa: N802
    r"""Converts a value that matches \d+(.\d+)? into a float."""
    if value is None:
        raise ValueError("None is not a valid float")
    value = float(value)
    if value < 0:
        raise ValueError("Only positive numbers are allowed")
    return value


def BOOL(value):  # noqa: N802
    """Convert the values 0 and 1 into booleans."""
    if value in ("1", "0"):
        return bool(int(value))
    raise ValueError(f"{value!r} is not 0 or 1")


def RANGE(value):  # noqa: N802
    """Convert a single integer or range spec into a slice

    ``n`` should become ``slice(n, n+1)``
    ``n:`` should become ``slice(n, None)``
    ``n:m`` should become ``slice(n, m)`` and ``m > n`` must hold
    """
    if ":" in value:
        start, stop = value.split(":", 1)
        start = UINT(start)
        if stop.strip():
            stop = UINT(stop)
            if start >= stop:
                raise ValueError("End must be larger than start")
        else:
            stop = None
    else:
        start = UINT(value)
        stop = start + 1
    return slice(start, stop)


class Commands:

    """Collection of MPD commands to expose to users.

    Normally used through the global instance which command handlers have been
    installed into.
    """

    def __init__(self):
        self.handlers = {}

    # TODO: consider removing auth_required and list_command in favour of
    # additional command instances to register in?
    def add(self, name, auth_required=True, list_command=True, **validators):
        """Create a decorator that registers a handler and validation rules.

        Additional keyword arguments are treated as converters/validators to
        apply to tokens converting them to proper Python types.

        Requirements for valid handlers:

        - must accept a context argument as the first arg.
        - may not use variable keyword arguments, ``**kwargs``.
        - may use variable arguments ``*args`` *or* a mix of required and
          optional arguments.

        Decorator returns the unwrapped function so that tests etc can use the
        functions with values with correct python types instead of strings.

        :param string name: Name of the command being registered.
        :param bool auth_required: If authorization is required.
        :param bool list_command: If command should be listed in reflection.
        """

        def wrapper(func):
            if name in self.handlers:
                raise ValueError(f"{name} already registered")

            spec = inspect.getfullargspec(func)
            defaults = dict(
                zip(spec.args[-len(spec.defaults or []) :], spec.defaults or [])
            )

            if not spec.args and not spec.varargs:
                raise TypeError("Handler must accept at least one argument.")

            if len(spec.args) > 1 and spec.varargs:
                raise TypeError(
                    "*args may not be combined with regular arguments"
                )

            if not set(validators.keys()).issubset(spec.args):
                raise TypeError("Validator for non-existent arg passed")

            if spec.varkw or spec.kwonlyargs:
                raise TypeError("Keyword arguments are not permitted")

            def validate(*args, **kwargs):
                if spec.varargs:
                    return func(*args, **kwargs)

                try:
                    ba = inspect.signature(func).bind(*args, **kwargs)
                    ba.apply_defaults()
                    callargs = ba.arguments
                except TypeError:
                    raise exceptions.MpdArgError(
                        f'wrong number of arguments for "{name}"'
                    )

                for key, value in callargs.items():
                    default = defaults.get(key, object())
                    if key in validators and value != default:
                        try:
                            callargs[key] = validators[key](value)
                        except ValueError:
                            raise exceptions.MpdArgError("incorrect arguments")

                return func(**callargs)

            validate.auth_required = auth_required
            validate.list_command = list_command
            self.handlers[name] = validate
            return func

        return wrapper

    def call(self, tokens, context=None):
        """Find and run the handler registered for the given command.

        If the handler was registered with any converters/validators they will
        be run before calling the real handler.

        :param list tokens: List of tokens to process
        :param context: MPD context.
        :type context: :class:`~mopidy_mpd.dispatcher.MpdContext`
        """
        if not tokens:
            raise exceptions.MpdNoCommand()
        if tokens[0] not in self.handlers:
            raise exceptions.MpdUnknownCommand(command=tokens[0])
        return self.handlers[tokens[0]](context, *tokens[1:])


#: Global instance to install commands into
commands = Commands()