1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
"""A dummy backend for use in tests.
This backend implements the backend API in the simplest way possible. It is
used in tests of the frontends.
"""
import pykka
from mopidy import backend
from mopidy.models import Playlist, Ref, SearchResult
def create_proxy(config=None, audio=None):
return DummyBackend.start(config=config, audio=audio).proxy()
class DummyBackend(pykka.ThreadingActor, backend.Backend):
def __init__(self, config, audio):
super().__init__()
self.library = DummyLibraryProvider(backend=self)
if audio:
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
else:
self.playback = DummyPlaybackProvider(audio=audio, backend=self)
self.playlists = DummyPlaylistsProvider(backend=self)
self.uri_schemes = ["dummy"]
class DummyLibraryProvider(backend.LibraryProvider):
root_directory = Ref.directory(uri="dummy:/", name="dummy")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dummy_library = []
self.dummy_get_distinct_result = {}
self.dummy_browse_result = {}
self.dummy_find_exact_result = SearchResult()
self.dummy_search_result = SearchResult()
def browse(self, path):
return self.dummy_browse_result.get(path, [])
def get_distinct(self, field, query=None):
return self.dummy_get_distinct_result.get(field, set())
def lookup(self, uri):
uri = Ref.track(uri=uri).uri
return [t for t in self.dummy_library if uri == t.uri]
def refresh(self, uri=None):
pass
def search(self, query=None, uris=None, exact=False):
if exact: # TODO: remove uses of dummy_find_exact_result
return self.dummy_find_exact_result
return self.dummy_search_result
class DummyPlaybackProvider(backend.PlaybackProvider):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._uri = None
self._time_position = 0
def pause(self):
return True
def play(self):
return self._uri and self._uri != "dummy:error"
def change_track(self, track):
"""Pass a track with URI 'dummy:error' to force failure"""
self._uri = track.uri
self._time_position = 0
return True
def prepare_change(self):
pass
def resume(self):
return True
def seek(self, time_position):
self._time_position = time_position
return True
def stop(self):
self._uri = None
return True
def get_time_position(self):
return self._time_position
class DummyPlaylistsProvider(backend.PlaylistsProvider):
def __init__(self, backend):
super().__init__(backend)
self._playlists = []
self._allow_save = True
def set_dummy_playlists(self, playlists):
"""For tests using the dummy provider through an actor proxy."""
self._playlists = playlists
def set_allow_save(self, enabled):
self._allow_save = enabled
def as_list(self):
return [
Ref.playlist(uri=pl.uri, name=pl.name) for pl in self._playlists
]
def get_items(self, uri):
playlist = self.lookup(uri)
if playlist is None:
return
return [Ref.track(uri=t.uri, name=t.name) for t in playlist.tracks]
def lookup(self, uri):
uri = Ref.playlist(uri=uri).uri
for playlist in self._playlists:
if playlist.uri == uri:
return playlist
def refresh(self):
pass
def create(self, name):
playlist = Playlist(name=name, uri=f"dummy:{name}")
self._playlists.append(playlist)
return playlist
def delete(self, uri):
playlist = self.lookup(uri)
if playlist:
self._playlists.remove(playlist)
def save(self, playlist):
if not self._allow_save:
return None
old_playlist = self.lookup(playlist.uri)
if old_playlist is not None:
index = self._playlists.index(old_playlist)
self._playlists[index] = playlist
else:
self._playlists.append(playlist)
return playlist
|