1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
"""Snapcast stream."""
class Snapstream():
"""Represents a snapcast stream."""
def __init__(self, data):
"""Initialize."""
self.update(data)
self._callback_func = None
@property
def identifier(self):
"""Get stream id."""
return self._stream.get('id')
@property
def status(self):
"""Get stream status."""
return self._stream.get('status')
@property
def name(self):
"""Get stream name."""
return self._stream.get('uri').get('query').get('name')
@property
def friendly_name(self):
"""Get friendly name."""
return self.name if self.name != '' else self.identifier
@property
def metadata(self):
"""Get metadata."""
if 'properties' in self._stream:
return self._stream['properties'].get('metadata')
return self._stream.get('meta')
@property
def meta(self):
"""Get metadata. Deprecated."""
return self.metadata
@property
def properties(self):
"""Get properties."""
return self._stream.get('properties')
@property
def path(self):
"""Get stream path."""
return self._stream.get('uri').get('path')
def update(self, data):
"""Update stream."""
self._stream = data
def update_meta(self, data):
"""Update stream metadata."""
self.update_metadata(data)
def update_metadata(self, data):
"""Update stream metadata."""
if 'properties' in self._stream:
self._stream['properties']['metadata'] = data
self._stream['meta'] = data
def update_properties(self, data):
"""Update stream properties."""
self._stream['properties'] = data
def __repr__(self):
"""Return string representation."""
return f'Snapstream ({self.name})'
def callback(self):
"""Run callback."""
if self._callback_func and callable(self._callback_func):
self._callback_func(self)
def set_callback(self, func):
"""Set callback."""
self._callback_func = func
|