1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
|
"""
This is Mopidy's MPD protocol implementation.
This is partly based upon the `MPD protocol documentation
<http://www.musicpd.org/doc/protocol/>`_, which is a useful resource, but it is
rather incomplete with regards to data formats, both for requests and
responses. Thus, we have had to talk a great deal with the the original `MPD
server <https://mpd.fandom.com/>`_ using telnet to get the details we need to
implement our own MPD server which is compatible with the numerous existing
`MPD clients <https://mpd.fandom.com/wiki/Clients>`_.
"""
import inspect
from mopidy_mpd import exceptions
#: The MPD protocol uses UTF-8 for encoding all data.
ENCODING = "UTF-8"
#: The MPD protocol uses ``\n`` as line terminator.
LINE_TERMINATOR = b"\n"
#: The MPD protocol version is 0.19.0.
VERSION = "0.19.0"
def load_protocol_modules():
"""
The protocol modules must be imported to get them registered in
:attr:`commands`.
"""
from . import ( # noqa
audio_output,
channels,
command_list,
connection,
current_playlist,
mount,
music_db,
playback,
reflection,
status,
stickers,
stored_playlists,
)
def INT(value): # noqa: N802
r"""Converts a value that matches [+-]?\d+ into an integer."""
if value is None:
raise ValueError("None is not a valid integer")
# TODO: check for whitespace via value != value.strip()?
return int(value)
def UINT(value): # noqa: N802
r"""Converts a value that matches \d+ into an integer."""
if value is None:
raise ValueError("None is not a valid integer")
if not value.isdigit():
raise ValueError("Only positive numbers are allowed")
return int(value)
def FLOAT(value): # noqa: N802
r"""Converts a value that matches [+-]\d+(.\d+)? into a float."""
if value is None:
raise ValueError("None is not a valid float")
return float(value)
def UFLOAT(value): # noqa: N802
r"""Converts a value that matches \d+(.\d+)? into a float."""
if value is None:
raise ValueError("None is not a valid float")
value = float(value)
if value < 0:
raise ValueError("Only positive numbers are allowed")
return value
def BOOL(value): # noqa: N802
"""Convert the values 0 and 1 into booleans."""
if value in ("1", "0"):
return bool(int(value))
raise ValueError(f"{value!r} is not 0 or 1")
def RANGE(value): # noqa: N802
"""Convert a single integer or range spec into a slice
``n`` should become ``slice(n, n+1)``
``n:`` should become ``slice(n, None)``
``n:m`` should become ``slice(n, m)`` and ``m > n`` must hold
"""
if ":" in value:
start, stop = value.split(":", 1)
start = UINT(start)
if stop.strip():
stop = UINT(stop)
if start >= stop:
raise ValueError("End must be larger than start")
else:
stop = None
else:
start = UINT(value)
stop = start + 1
return slice(start, stop)
class Commands:
"""Collection of MPD commands to expose to users.
Normally used through the global instance which command handlers have been
installed into.
"""
def __init__(self):
self.handlers = {}
# TODO: consider removing auth_required and list_command in favour of
# additional command instances to register in?
def add(self, name, auth_required=True, list_command=True, **validators):
"""Create a decorator that registers a handler and validation rules.
Additional keyword arguments are treated as converters/validators to
apply to tokens converting them to proper Python types.
Requirements for valid handlers:
- must accept a context argument as the first arg.
- may not use variable keyword arguments, ``**kwargs``.
- may use variable arguments ``*args`` *or* a mix of required and
optional arguments.
Decorator returns the unwrapped function so that tests etc can use the
functions with values with correct python types instead of strings.
:param string name: Name of the command being registered.
:param bool auth_required: If authorization is required.
:param bool list_command: If command should be listed in reflection.
"""
def wrapper(func):
if name in self.handlers:
raise ValueError(f"{name} already registered")
spec = inspect.getfullargspec(func)
defaults = dict(
zip(spec.args[-len(spec.defaults or []) :], spec.defaults or [])
)
if not spec.args and not spec.varargs:
raise TypeError("Handler must accept at least one argument.")
if len(spec.args) > 1 and spec.varargs:
raise TypeError(
"*args may not be combined with regular arguments"
)
if not set(validators.keys()).issubset(spec.args):
raise TypeError("Validator for non-existent arg passed")
if spec.varkw or spec.kwonlyargs:
raise TypeError("Keyword arguments are not permitted")
def validate(*args, **kwargs):
if spec.varargs:
return func(*args, **kwargs)
try:
ba = inspect.signature(func).bind(*args, **kwargs)
ba.apply_defaults()
callargs = ba.arguments
except TypeError:
raise exceptions.MpdArgError(
f'wrong number of arguments for "{name}"'
)
for key, value in callargs.items():
default = defaults.get(key, object())
if key in validators and value != default:
try:
callargs[key] = validators[key](value)
except ValueError:
raise exceptions.MpdArgError("incorrect arguments")
return func(**callargs)
validate.auth_required = auth_required
validate.list_command = list_command
self.handlers[name] = validate
return func
return wrapper
def call(self, tokens, context=None):
"""Find and run the handler registered for the given command.
If the handler was registered with any converters/validators they will
be run before calling the real handler.
:param list tokens: List of tokens to process
:param context: MPD context.
:type context: :class:`~mopidy_mpd.dispatcher.MpdContext`
"""
if not tokens:
raise exceptions.MpdNoCommand()
if tokens[0] not in self.handlers:
raise exceptions.MpdUnknownCommand(command=tokens[0])
return self.handlers[tokens[0]](context, *tokens[1:])
#: Global instance to install commands into
commands = Commands()
|