File: dummy_backend.py

package info (click to toggle)
mopidy 3.4.2-4
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 2,616 kB
  • sloc: python: 16,656; sh: 159; makefile: 126
file content (153 lines) | stat: -rw-r--r-- 4,258 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""A dummy backend for use in tests.

This backend implements the backend API in the simplest way possible.  It is
used in tests of the frontends.
"""


import pykka

from mopidy import backend
from mopidy.models import Playlist, Ref, SearchResult


def create_proxy(config=None, audio=None):
    return DummyBackend.start(config=config, audio=audio).proxy()


class DummyBackend(pykka.ThreadingActor, backend.Backend):
    def __init__(self, config, audio):
        super().__init__()

        self.library = DummyLibraryProvider(backend=self)
        if audio:
            self.playback = backend.PlaybackProvider(audio=audio, backend=self)
        else:
            self.playback = DummyPlaybackProvider(audio=audio, backend=self)
        self.playlists = DummyPlaylistsProvider(backend=self)

        self.uri_schemes = ["dummy"]


class DummyLibraryProvider(backend.LibraryProvider):
    root_directory = Ref.directory(uri="dummy:/", name="dummy")

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.dummy_library = []
        self.dummy_get_distinct_result = {}
        self.dummy_browse_result = {}
        self.dummy_find_exact_result = SearchResult()
        self.dummy_search_result = SearchResult()

    def browse(self, path):
        return self.dummy_browse_result.get(path, [])

    def get_distinct(self, field, query=None):
        return self.dummy_get_distinct_result.get(field, set())

    def lookup(self, uri):
        uri = Ref.track(uri=uri).uri
        return [t for t in self.dummy_library if uri == t.uri]

    def refresh(self, uri=None):
        pass

    def search(self, query=None, uris=None, exact=False):
        if exact:  # TODO: remove uses of dummy_find_exact_result
            return self.dummy_find_exact_result
        return self.dummy_search_result


class DummyPlaybackProvider(backend.PlaybackProvider):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._uri = None
        self._time_position = 0

    def pause(self):
        return True

    def play(self):
        return self._uri and self._uri != "dummy:error"

    def change_track(self, track):
        """Pass a track with URI 'dummy:error' to force failure"""
        self._uri = track.uri
        self._time_position = 0
        return True

    def prepare_change(self):
        pass

    def resume(self):
        return True

    def seek(self, time_position):
        self._time_position = time_position
        return True

    def stop(self):
        self._uri = None
        return True

    def get_time_position(self):
        return self._time_position


class DummyPlaylistsProvider(backend.PlaylistsProvider):
    def __init__(self, backend):
        super().__init__(backend)
        self._playlists = []
        self._allow_save = True

    def set_dummy_playlists(self, playlists):
        """For tests using the dummy provider through an actor proxy."""
        self._playlists = playlists

    def set_allow_save(self, enabled):
        self._allow_save = enabled

    def as_list(self):
        return [
            Ref.playlist(uri=pl.uri, name=pl.name) for pl in self._playlists
        ]

    def get_items(self, uri):
        playlist = self.lookup(uri)
        if playlist is None:
            return
        return [Ref.track(uri=t.uri, name=t.name) for t in playlist.tracks]

    def lookup(self, uri):
        uri = Ref.playlist(uri=uri).uri
        for playlist in self._playlists:
            if playlist.uri == uri:
                return playlist

    def refresh(self):
        pass

    def create(self, name):
        playlist = Playlist(name=name, uri=f"dummy:{name}")
        self._playlists.append(playlist)
        return playlist

    def delete(self, uri):
        playlist = self.lookup(uri)
        if playlist:
            self._playlists.remove(playlist)

    def save(self, playlist):
        if not self._allow_save:
            return None

        old_playlist = self.lookup(playlist.uri)

        if old_playlist is not None:
            index = self._playlists.index(old_playlist)
            self._playlists[index] = playlist
        else:
            self._playlists.append(playlist)

        return playlist