File: stream.py

package info (click to toggle)
python-snapcast 2.3.7-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 184 kB
  • sloc: python: 1,564; makefile: 9
file content (83 lines) | stat: -rw-r--r-- 2,136 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Snapcast stream."""


class Snapstream():
    """Represents a snapcast stream."""

    def __init__(self, data):
        """Initialize."""
        self.update(data)
        self._callback_func = None

    @property
    def identifier(self):
        """Get stream id."""
        return self._stream.get('id')

    @property
    def status(self):
        """Get stream status."""
        return self._stream.get('status')

    @property
    def name(self):
        """Get stream name."""
        return self._stream.get('uri').get('query').get('name')

    @property
    def friendly_name(self):
        """Get friendly name."""
        return self.name if self.name != '' else self.identifier

    @property
    def metadata(self):
        """Get metadata."""
        if 'properties' in self._stream:
            return self._stream['properties'].get('metadata')
        return self._stream.get('meta')

    @property
    def meta(self):
        """Get metadata. Deprecated."""
        return self.metadata

    @property
    def properties(self):
        """Get properties."""
        return self._stream.get('properties')

    @property
    def path(self):
        """Get stream path."""
        return self._stream.get('uri').get('path')

    def update(self, data):
        """Update stream."""
        self._stream = data

    def update_meta(self, data):
        """Update stream metadata."""
        self.update_metadata(data)

    def update_metadata(self, data):
        """Update stream metadata."""
        if 'properties' in self._stream:
            self._stream['properties']['metadata'] = data
        self._stream['meta'] = data

    def update_properties(self, data):
        """Update stream properties."""
        self._stream['properties'] = data

    def __repr__(self):
        """Return string representation."""
        return f'Snapstream ({self.name})'

    def callback(self):
        """Run callback."""
        if self._callback_func and callable(self._callback_func):
            self._callback_func(self)

    def set_callback(self, func):
        """Set callback."""
        self._callback_func = func