File: group.py

package info (click to toggle)
python-snapcast 2.3.7-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 184 kB
  • sloc: python: 1,564; makefile: 9
file content (195 lines) | stat: -rw-r--r-- 6,739 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""Snapcast group."""
import logging


_LOGGER = logging.getLogger(__name__)


# pylint: disable=too-many-public-methods
class Snapgroup():
    """Represents a snapcast group."""

    def __init__(self, server, data):
        """Initialize."""
        self._server = server
        self._snapshot = None
        self._callback_func = None
        self.update(data)

    def update(self, data):
        """Update group."""
        self._group = data

    @property
    def identifier(self):
        """Get group identifier."""
        return self._group.get('id')

    @property
    def name(self):
        """Get group name."""
        return self._group.get('name')

    async def set_name(self, name):
        """Set a group name."""
        if not name:
            name = ''
        self._group['name'] = name
        await self._server.group_name(self.identifier, name)

    @property
    def stream(self):
        """Get stream identifier."""
        return self._group.get('stream_id')

    async def set_stream(self, stream_id):
        """Set group stream."""
        self._group['stream_id'] = stream_id
        await self._server.group_stream(self.identifier, stream_id)
        _LOGGER.debug('set stream to %s on %s', stream_id, self.friendly_name)

    @property
    def stream_status(self):
        """Get stream status."""
        return self._server.stream(self.stream).status

    @property
    def muted(self):
        """Get mute status."""
        return self._group.get('muted')

    async def set_muted(self, status):
        """Set group mute status."""
        self._group['muted'] = status
        await self._server.group_mute(self.identifier, status)
        _LOGGER.debug('set muted to %s on %s', status, self.friendly_name)

    @property
    def volume(self):
        """Get volume."""
        volume_sum = 0
        for client in self._group.get('clients'):
            volume_sum += self._server.client(client.get('id')).volume
        return int(volume_sum / len(self._group.get('clients')))

    async def set_volume(self, volume):
        """Set volume."""
        if volume not in range(0, 101):
            raise ValueError('Volume out of range')
        current_volume = self.volume
        if volume == current_volume:
            _LOGGER.debug('left volume at %s on group %s', volume, self.friendly_name)
            return
        delta = volume - current_volume
        if delta < 0:
            ratio = (current_volume - volume) / current_volume
        else:
            ratio = (volume - current_volume) / (100 - current_volume)
        for data in self._group.get('clients'):
            client = self._server.client(data.get('id'))
            client_volume = client.volume
            if delta < 0:
                client_volume -= ratio * client_volume
            else:
                client_volume += ratio * (100 - client_volume)
            client_volume = round(client_volume)
            await client.set_volume(client_volume, update_group=False)
            client.update_volume({
                'volume': {
                    'percent': client_volume,
                    'muted': client.muted
                }
            })
        _LOGGER.debug('set volume to %s on group %s', volume, self.friendly_name)

    @property
    def friendly_name(self):
        """Get friendly name."""
        fname = self.name if self.name != '' else "+".join(
            sorted([self._server.client(c).friendly_name for c in self.clients
                    if c in [client.identifier for client in self._server.clients]]))
        return fname if fname != '' else self.identifier

    @property
    def clients(self):
        """Get client identifiers."""
        return [client.get('id') for client in self._group.get('clients')]

    async def add_client(self, client_identifier):
        """Add a client."""
        if client_identifier in self.clients:
            _LOGGER.error('%s already in group %s', client_identifier, self.identifier)
            return
        new_clients = self.clients
        new_clients.append(client_identifier)
        await self._server.group_clients(self.identifier, new_clients)
        _LOGGER.debug('added %s to %s', client_identifier, self.identifier)
        status = (await self._server.status())[0]
        self._server.synchronize(status)
        self._server.client(client_identifier).callback()
        self.callback()

    async def remove_client(self, client_identifier):
        """Remove a client."""
        new_clients = self.clients
        new_clients.remove(client_identifier)
        await self._server.group_clients(self.identifier, new_clients)
        _LOGGER.debug('removed %s from %s', client_identifier, self.identifier)
        status = (await self._server.status())[0]
        self._server.synchronize(status)
        self._server.client(client_identifier).callback()
        self.callback()

    def streams_by_name(self):
        """Get available stream objects by name."""
        return {stream.friendly_name: stream for stream in self._server.streams}

    def update_mute(self, data):
        """Update mute."""
        self._group['muted'] = data['mute']
        self.callback()
        _LOGGER.debug('updated mute on %s', self.friendly_name)

    def update_name(self, data):
        """Update name."""
        self._group['name'] = data['name']
        _LOGGER.debug('updated name on %s', self.name)
        self.callback()

    def update_stream(self, data):
        """Update stream."""
        self._group['stream_id'] = data['stream_id']
        self.callback()
        _LOGGER.debug('updated stream to %s on %s', self.stream, self.friendly_name)

    def snapshot(self):
        """Snapshot current state."""
        self._snapshot = {
            'muted': self.muted,
            'volume': self.volume,
            'stream': self.stream
        }
        _LOGGER.debug('took snapshot of current state of %s', self.friendly_name)

    async def restore(self):
        """Restore snapshotted state."""
        if not self._snapshot:
            return
        await self.set_muted(self._snapshot['muted'])
        await self.set_volume(self._snapshot['volume'])
        await self.set_stream(self._snapshot['stream'])
        self.callback()
        _LOGGER.debug('restored snapshot of state of %s', self.friendly_name)

    def callback(self):
        """Run callback."""
        if self._callback_func and callable(self._callback_func):
            self._callback_func(self)

    def set_callback(self, func):
        """Set callback."""
        self._callback_func = func

    def __repr__(self):
        """Return string representation."""
        return f'Snapgroup ({self.friendly_name}, {self.identifier})'