1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
"""Snapcast group."""
import logging
_LOGGER = logging.getLogger(__name__)
# pylint: disable=too-many-public-methods
class Snapgroup():
"""Represents a snapcast group."""
def __init__(self, server, data):
"""Initialize."""
self._server = server
self._snapshot = None
self._callback_func = None
self.update(data)
def update(self, data):
"""Update group."""
self._group = data
@property
def identifier(self):
"""Get group identifier."""
return self._group.get('id')
@property
def name(self):
"""Get group name."""
return self._group.get('name')
async def set_name(self, name):
"""Set a group name."""
if not name:
name = ''
self._group['name'] = name
await self._server.group_name(self.identifier, name)
@property
def stream(self):
"""Get stream identifier."""
return self._group.get('stream_id')
async def set_stream(self, stream_id):
"""Set group stream."""
self._group['stream_id'] = stream_id
await self._server.group_stream(self.identifier, stream_id)
_LOGGER.debug('set stream to %s on %s', stream_id, self.friendly_name)
@property
def stream_status(self):
"""Get stream status."""
return self._server.stream(self.stream).status
@property
def muted(self):
"""Get mute status."""
return self._group.get('muted')
async def set_muted(self, status):
"""Set group mute status."""
self._group['muted'] = status
await self._server.group_mute(self.identifier, status)
_LOGGER.debug('set muted to %s on %s', status, self.friendly_name)
@property
def volume(self):
"""Get volume."""
volume_sum = 0
for client in self._group.get('clients'):
volume_sum += self._server.client(client.get('id')).volume
return int(volume_sum / len(self._group.get('clients')))
async def set_volume(self, volume):
"""Set volume."""
if volume not in range(0, 101):
raise ValueError('Volume out of range')
current_volume = self.volume
if volume == current_volume:
_LOGGER.debug('left volume at %s on group %s', volume, self.friendly_name)
return
delta = volume - current_volume
if delta < 0:
ratio = (current_volume - volume) / current_volume
else:
ratio = (volume - current_volume) / (100 - current_volume)
for data in self._group.get('clients'):
client = self._server.client(data.get('id'))
client_volume = client.volume
if delta < 0:
client_volume -= ratio * client_volume
else:
client_volume += ratio * (100 - client_volume)
client_volume = round(client_volume)
await client.set_volume(client_volume, update_group=False)
client.update_volume({
'volume': {
'percent': client_volume,
'muted': client.muted
}
})
_LOGGER.debug('set volume to %s on group %s', volume, self.friendly_name)
@property
def friendly_name(self):
"""Get friendly name."""
fname = self.name if self.name != '' else "+".join(
sorted([self._server.client(c).friendly_name for c in self.clients
if c in [client.identifier for client in self._server.clients]]))
return fname if fname != '' else self.identifier
@property
def clients(self):
"""Get client identifiers."""
return [client.get('id') for client in self._group.get('clients')]
async def add_client(self, client_identifier):
"""Add a client."""
if client_identifier in self.clients:
_LOGGER.error('%s already in group %s', client_identifier, self.identifier)
return
new_clients = self.clients
new_clients.append(client_identifier)
await self._server.group_clients(self.identifier, new_clients)
_LOGGER.debug('added %s to %s', client_identifier, self.identifier)
status = (await self._server.status())[0]
self._server.synchronize(status)
self._server.client(client_identifier).callback()
self.callback()
async def remove_client(self, client_identifier):
"""Remove a client."""
new_clients = self.clients
new_clients.remove(client_identifier)
await self._server.group_clients(self.identifier, new_clients)
_LOGGER.debug('removed %s from %s', client_identifier, self.identifier)
status = (await self._server.status())[0]
self._server.synchronize(status)
self._server.client(client_identifier).callback()
self.callback()
def streams_by_name(self):
"""Get available stream objects by name."""
return {stream.friendly_name: stream for stream in self._server.streams}
def update_mute(self, data):
"""Update mute."""
self._group['muted'] = data['mute']
self.callback()
_LOGGER.debug('updated mute on %s', self.friendly_name)
def update_name(self, data):
"""Update name."""
self._group['name'] = data['name']
_LOGGER.debug('updated name on %s', self.name)
self.callback()
def update_stream(self, data):
"""Update stream."""
self._group['stream_id'] = data['stream_id']
self.callback()
_LOGGER.debug('updated stream to %s on %s', self.stream, self.friendly_name)
def snapshot(self):
"""Snapshot current state."""
self._snapshot = {
'muted': self.muted,
'volume': self.volume,
'stream': self.stream
}
_LOGGER.debug('took snapshot of current state of %s', self.friendly_name)
async def restore(self):
"""Restore snapshotted state."""
if not self._snapshot:
return
await self.set_muted(self._snapshot['muted'])
await self.set_volume(self._snapshot['volume'])
await self.set_stream(self._snapshot['stream'])
self.callback()
_LOGGER.debug('restored snapshot of state of %s', self.friendly_name)
def callback(self):
"""Run callback."""
if self._callback_func and callable(self._callback_func):
self._callback_func(self)
def set_callback(self, func):
"""Set callback."""
self._callback_func = func
def __repr__(self):
"""Return string representation."""
return f'Snapgroup ({self.friendly_name}, {self.identifier})'
|