File: subscription.py

package info (click to toggle)
python-logi-circle 0.2.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 300 kB
  • sloc: python: 1,685; xml: 16; sh: 5; makefile: 4
file content (138 lines) | stat: -rw-r--r-- 4,755 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""Subscription class"""
# coding: utf-8
# vim:sw=4:ts=4:et:
import logging
import asyncio
import json
import aiohttp
from .const import ACTIVITY_EVENTS, ACCESSORIES_ENDPOINT, ACTIVITIES_ENDPOINT
from .utils import _get_camera_from_id
from .activity import Activity
from .exception import SubscriptionClosed

_LOGGER = logging.getLogger(__name__)


class Subscription():
    """Generic implementation for a Logi Circle event subscription."""

    def __init__(self, wss_url, cameras, ping_interval=60, raw=False):
        """Initialize Subscription object"""
        self.wss_url = wss_url
        self._cameras = cameras
        self._ping_interval = ping_interval
        self._ws = None
        self._session = None
        self._raw = raw
        self._closed = False
        self._invalidated = False

    async def open(self):
        """Establish a new WebSockets connection"""
        if not self.opened:
            return RuntimeError('This subscription has been closed')
        self._session = aiohttp.ClientSession()
        self._ws = await self._session.ws_connect(
            self.wss_url)
        _LOGGER.debug("Opened WS connection to url %s", self.wss_url)

        if self._ping_interval > 0:
            asyncio.ensure_future(self._auto_ping(self._ping_interval))

    async def close(self):
        """Close WebSockets connection"""
        if not self.opened:
            return

        self._closed = True
        if isinstance(self._ws, aiohttp.ClientWebSocketResponse):
            await self._ws.close()
            self._ws = None

        if isinstance(self._session, aiohttp.ClientSession):
            await self._session.close()
            self._session = None

    async def ping(self):
        """Send a ping frame"""
        if not self.opened or self._ws is None:
            return

        _LOGGER.debug("WS: Sending ping frame")
        await self._ws.ping()

    async def get_next_event(self):
        """Wait for next WS frame"""
        if self._session is None:
            await self.open()
        if self._invalidated:
            _LOGGER.debug("WS: Invalidating subscription")
            await self.close()
            return {}
        if not self.opened:
            raise SubscriptionClosed("Subscription is closed")

        _LOGGER.debug("WS: Waiting for next frame")
        msg = await self._ws.receive()

        if self._raw:
            return msg
        if self._ws.closed:
            await self.close()
            return {}
        if msg.data:
            self._handle_event(msg.data)

        return msg

    def invalidate(self):
        """Signal event broker(s) to close subscription on next WS frame."""
        self._invalidated = True

    @property
    def opened(self):
        """Returns a bool indicating whether the subscription is active."""
        return not self._closed

    @property
    def invalidated(self):
        """Returns a bool indicating whether the subscription has been invalidated."""
        return self._invalidated

    @staticmethod
    def _handle_activity(event_type, event, camera):
        """Controls the camera's current_activity prop based on incoming activity events."""

        if event_type in ['activity_created', 'activity_updated']:
            # Set camera's current activity prop to this activity
            camera._current_activity = Activity(activity=event,
                                                url='%s/%s%s' % (ACCESSORIES_ENDPOINT, camera.id, ACTIVITIES_ENDPOINT),
                                                local_tz=camera._local_tz,
                                                logi=camera.logi)
            camera._last_activity = camera._current_activity

        if event_type == 'activity_finished' and camera._current_activity:
            camera._current_activity = None

    async def _auto_ping(self, interval):
        """Send ping frames at the specified interval"""
        while self.opened:
            await asyncio.sleep(interval)
            await self.ping()

    def _handle_event(self, data):
        """Perform action with event"""
        event = json.loads(data)
        event_type = event['eventType']
        camera = _get_camera_from_id(event['eventData']['accessoryId'], self._cameras)

        _LOGGER.debug('WS: Got event %s for %s', event_type, camera.name)

        if event_type == "accessory_settings_changed":
            # Update camera props with changes
            camera._set_attributes(event['eventData'])
        elif event_type in ACTIVITY_EVENTS:
            # Set/unset camera's current activity
            Subscription._handle_activity(event_type, event['eventData'], camera)
        else:
            _LOGGER.warning('WS: Event type %s was unhandled', event_type)