File: activity.py

package info (click to toggle)
python-yalexs 9.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,120 kB
  • sloc: python: 7,916; makefile: 3; sh: 2
file content (308 lines) | stat: -rw-r--r-- 12,506 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""Consume the august activity stream."""

from __future__ import annotations

import asyncio
import logging
from collections import defaultdict

from aiohttp import ClientError

from ..activity import Activity, ActivityType
from ..api_async import ApiAsync
from ..backports.tasks import create_eager_task
from ..exceptions import AugustApiAIOHTTPError
from ..pubnub_async import AugustPubNub
from ..util import get_latest_activity
from .const import ACTIVITY_UPDATE_INTERVAL
from .gateway import Gateway
from .socketio import SocketIORunner
from .subscriber import SubscriberMixin

_LOGGER = logging.getLogger(__name__)

ACTIVITY_STREAM_FETCH_LIMIT = 10
ACTIVITY_CATCH_UP_FETCH_LIMIT = 2500

INITIAL_LOCK_RESYNC_TIME = 45

# If there is a storm of activity (ie lock, unlock, door open, door close, etc)
# we want to debounce the updates so we don't hammer the activity api too much.
ACTIVITY_DEBOUNCE_COOLDOWN = 4.0

# How long we expect it to take between when we get a WebSocket/PubNub
# message and the activity API to be updated.
UPDATE_SOON = 3.0

NEVER_TIME = -86400.0


class ActivityStream(SubscriberMixin):
    """August activity stream handler."""

    def __init__(
        self,
        api: ApiAsync,
        august_gateway: Gateway,
        house_ids: set[str],
        push: AugustPubNub | SocketIORunner,
    ) -> None:
        """Init activity stream object."""
        super().__init__(ACTIVITY_UPDATE_INTERVAL)
        self._schedule_updates: dict[str, asyncio.TimerHandle] = {}
        self._august_gateway = august_gateway
        self._api = api
        self._house_ids = house_ids
        self._latest_activities: defaultdict[
            str, dict[ActivityType, Activity | None]
        ] = defaultdict(lambda: defaultdict(lambda: None))
        self._did_first_update = False
        self.push = push
        self._update_tasks: dict[str, asyncio.Task] = {}
        self._last_update_time: dict[str, float] = dict.fromkeys(house_ids, NEVER_TIME)
        self._start_time: float | None = None
        self._pending_updates: dict[str, int] = dict.fromkeys(house_ids, 1)
        self._loop = asyncio.get_running_loop()
        self._shutdown: bool = False

    async def async_setup(self) -> None:
        """Token refresh check and catch up the activity stream."""
        self._start_time = self._loop.time()
        await self._async_refresh()
        await self._async_first_refresh()
        self._did_first_update = True

    def async_stop(self) -> None:
        """Cleanup any debounces."""
        self._shutdown = True
        for task in self._update_tasks.values():
            task.cancel()
        self._update_tasks.clear()
        self._async_cancel_all_future_updates()

    def _async_cancel_future_updates(self, house_id: str) -> None:
        """Cancel future updates."""
        if handle := self._schedule_updates.pop(house_id, None):
            handle.cancel()
        self._pending_updates[house_id] = 0

    def _async_cancel_all_future_updates(self) -> None:
        """Cancel all future updates."""
        for house_id in self._house_ids:
            self._async_cancel_future_updates(house_id)

    def get_latest_device_activity(
        self, device_id: str, activity_types: set[ActivityType]
    ) -> Activity | None:
        """Return latest activity that is one of the activity_types."""
        if not (latest_device_activities := self._latest_activities.get(device_id)):
            return None

        latest_activity: Activity | None = None

        for activity_type in activity_types:
            if activity := latest_device_activities.get(activity_type):
                if (
                    latest_activity
                    and activity.activity_start_time
                    <= latest_activity.activity_start_time
                ):
                    continue
                latest_activity = activity

        return latest_activity

    @property
    def push_updates_connected(self) -> bool:
        """Return if the push updates are connected."""
        return self.push.connected

    async def _async_refresh(self) -> None:
        """Update the activity stream from August."""
        # This is the only place we refresh the api token
        if self._shutdown:
            return
        await self._august_gateway.async_refresh_access_token_if_needed()
        if not self.push_updates_connected:
            _LOGGER.debug("Push updates are not connected, data will be stale")

    async def _async_first_refresh(self) -> None:
        """Update the activity stream from August for the first time."""
        if self.push_updates_connected:
            _LOGGER.debug("Skipping update because push updates are active")
            return
        _LOGGER.debug("Start retrieving device activities")
        # Await in sequence to avoid hammering the API
        for house_id in self._house_ids:
            if not self._update_running(house_id):
                await self._create_update_task(house_id)

    def _create_update_task(self, house_id: str) -> asyncio.Task:
        """Create an update task."""
        if self._update_running(house_id):
            raise RuntimeError("Update already running")
        self._update_tasks[house_id] = create_eager_task(
            self._async_execute_schedule_update(house_id), loop=self._loop
        )
        return self._update_tasks[house_id]

    def _update_running(self, house_id: str) -> bool:
        """Return if an update is running for the house id."""
        return bool(
            (current_task := self._update_tasks.get(house_id))
            and not current_task.done()
        )

    def _updated_recently(self, house_id: str, now: float) -> bool:
        """Return if the house id was updated recently."""
        return self._last_update_time[house_id] + ACTIVITY_DEBOUNCE_COOLDOWN > now

    def _async_schedule_update_callback(self, house_id: str) -> None:
        """Schedule an update callback."""
        self._schedule_updates.pop(house_id, None)
        now = self._loop.time()
        if delay := self._determine_update_delay(house_id, now, from_callback=True):
            self._async_schedule_update(house_id, now, delay)
            return
        self._create_update_task(house_id)

    def _determine_update_delay(
        self, house_id: str, now: float, from_callback: bool = False
    ) -> float:
        """Return if we should delay the update."""
        if self._updated_recently(house_id, now) or self._update_running(house_id):
            return ACTIVITY_DEBOUNCE_COOLDOWN
        if not self._initial_resync_complete(now):
            return INITIAL_LOCK_RESYNC_TIME
        return 0 if from_callback else UPDATE_SOON

    def _async_schedule_update(self, house_id: str, now: float, delay: float) -> None:
        """Update the activity stream now or in the future if its too soon."""
        if self._shutdown or self._pending_updates[house_id] <= 0:
            return
        _LOGGER.debug(
            "Scheduling update for house id %s in %s seconds", house_id, delay
        )
        # Do not update right away because the activities API is
        # likely not updated yet and we will just get the same
        # activities again. Instead, schedule the update for
        # the future.
        if scheduled := self._schedule_updates.pop(house_id, None):
            scheduled.cancel()
        self._schedule_updates[house_id] = self._loop.call_at(
            now + delay, self._async_schedule_update_callback, house_id
        )

    async def _async_execute_schedule_update(self, house_id: str) -> None:
        """Execute a scheduled update."""
        self._pending_updates[house_id] -= 1
        self._last_update_time[house_id] = self._loop.time()
        await self._async_update_house_id(house_id)
        if (pending_count := self._pending_updates[house_id]) > 0:
            _LOGGER.debug(
                "There are %s pending updates for house id %s", pending_count, house_id
            )
            now = self._loop.time()
            delay = self._determine_update_delay(house_id, now)
            self._async_schedule_update(house_id, now, delay)

    def _initial_resync_complete(self, now: float) -> bool:
        """Return if the initial resync is complete."""
        return self._start_time and now - self._start_time > INITIAL_LOCK_RESYNC_TIME

    def _set_update_count(self, house_id: str, now: float) -> None:
        """Set the update count."""
        # Schedule one update soon and two updates past the debounce time
        # to ensure we catch the case where the activity
        # api does not update right away and we need to poll
        # it again. Sometimes the lock operator or a doorbell
        # will not show up in the activity stream right away.
        # Only do additional polls if we are past
        # the initial lock resync time to avoid a storm
        # of activity at setup.
        if not self._initial_resync_complete(now):
            # No resync yet, above spamming the API
            update_count = 1
        else:
            # Initial resync complete, we can do 2 updates
            update_count = 2
        self._pending_updates[house_id] = update_count

    def async_schedule_house_id_refresh(self, house_id: str) -> None:
        """Update for a house activities now and once in the future."""
        self._async_cancel_future_updates(house_id)
        now = self._loop.time()
        self._set_update_count(house_id, now)
        delay = self._determine_update_delay(house_id, now)
        self._async_schedule_update(house_id, now, delay)

    def _activity_limit(self) -> bool:
        """Return if the activity limit has been reached."""
        if self._did_first_update:
            return ACTIVITY_STREAM_FETCH_LIMIT
        return ACTIVITY_CATCH_UP_FETCH_LIMIT

    async def _async_update_house_id(self, house_id: str) -> None:
        """Update device activities for a house.

        Must only be called from _async_execute_schedule_update
        """
        if self._shutdown:
            return

        _LOGGER.debug("Updating device activity for house id %s", house_id)
        try:
            activities = await self._api.async_get_house_activities(
                await self._august_gateway.async_get_access_token(),
                house_id,
                limit=self._activity_limit(),
            )
        except (AugustApiAIOHTTPError, ClientError) as ex:
            _LOGGER.error(
                "Request error trying to retrieve activity for house id %s: %s",
                house_id,
                ex,
            )
            # Make sure we process the next house if one of them fails
            return

        _LOGGER.debug(
            "Completed retrieving device activities for house id %s", house_id
        )
        for device_id in self.async_process_newer_device_activities(activities):
            _LOGGER.debug(
                "async_signal_device_id_update (from activity stream): %s",
                device_id,
            )
            self.async_signal_device_id_update(device_id)

    def async_process_newer_device_activities(
        self, activities: list[Activity]
    ) -> set[str]:
        """Process activities if they are newer than the last one."""
        updated_device_ids: set[str] = set()
        latest_activities = self._latest_activities
        for activity in activities:
            device_id = activity.device_id
            activity_type = activity.activity_type
            device_activities = latest_activities[device_id]
            # Ignore activities that are older than the latest one unless it is a non
            # locking or unlocking activity with the exact same start time.
            last_activity = device_activities[activity_type]
            # The activity stream can have duplicate activities. So we need
            # to call get_latest_activity to figure out if if the activity
            # is actually newer than the last one.
            latest_activity = get_latest_activity(activity, last_activity)
            if latest_activity != activity:
                _LOGGER.debug(
                    "Skipping activity %s for device %s as it is not newer than the last one: %s",
                    activity,
                    device_id,
                    last_activity,
                )
                continue

            device_activities[activity_type] = activity
            updated_device_ids.add(device_id)

        return updated_device_ids