File: authenticator_async.py

package info (click to toggle)
python-yalexs 9.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,120 kB
  • sloc: python: 7,916; makefile: 3; sh: 2
file content (145 lines) | stat: -rw-r--r-- 5,328 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from __future__ import annotations

import json
import logging
from datetime import datetime, timedelta, timezone

import aiofiles
from aiohttp import ClientError

from .api_async import ApiAsync
from .authenticator_common import (
    Authentication,
    AuthenticationState,
    AuthenticatorCommon,
    ValidationResult,
    from_authentication_json,
    to_authentication_json,
)
from .exceptions import AugustApiAIOHTTPError

_LOGGER = logging.getLogger(__name__)


class AuthenticatorAsync(AuthenticatorCommon):
    """Class to manage authentication with the August API."""

    _api: ApiAsync

    async def _read_access_token_file(
        self,
        access_token_cache_file: str,
        file: aiofiles.threadpool.binary.AsyncBufferedIOBase,
    ) -> None:
        contents = await file.read()
        self._authentication = from_authentication_json(json.loads(contents))

        # If token is to expire within 7 days then print a warning.
        if self._authentication.is_expired():
            _LOGGER.error("Token has expired.")
            self._authentication = Authentication(
                AuthenticationState.REQUIRES_AUTHENTICATION,
                install_id=self._install_id,
            )
        # If token is not expired but less then 7 days before it
        # will.
        elif (
            self._authentication.parsed_expiration_time() - datetime.now(timezone.utc)
        ) < timedelta(days=7):
            exp_time = self._authentication.access_token_expires
            _LOGGER.warning(
                "API Token is going to expire at %s "
                "hours. Deleting file %s will result "
                "in a new token being requested next"
                " time",
                exp_time,
                access_token_cache_file,
            )

    async def async_setup_authentication(self) -> None:
        if access_token_cache_file := self._access_token_cache_file:
            try:
                async with aiofiles.open(access_token_cache_file) as file:
                    await self._read_access_token_file(access_token_cache_file, file)
                    return
            except FileNotFoundError:
                _LOGGER.debug("Cache file not found: %s", access_token_cache_file)
            except json.decoder.JSONDecodeError as error:
                _LOGGER.error(
                    "Unable to read cache file (%s): %s",
                    access_token_cache_file,
                    error,
                )

        self._authentication = Authentication(
            AuthenticationState.REQUIRES_AUTHENTICATION, install_id=self._install_id
        )

    async def async_authenticate(self) -> Authentication:
        if self._api.brand_config.require_oauth:
            raise RuntimeError(f"OAuth is required for brand {self._api.brand}")

        if self._authentication.state is AuthenticationState.AUTHENTICATED:
            return self._authentication

        identifier = self._login_method + ":" + self._username
        install_id = self._authentication.install_id
        response = await self._api.async_get_session(
            install_id, identifier, self._password
        )

        json_dict = await response.json()
        authentication = self._authentication_from_session_response(
            install_id, response.headers, json_dict
        )

        if authentication.state is AuthenticationState.AUTHENTICATED:
            await self._async_cache_authentication(authentication)

        return authentication

    async def async_validate_verification_code(
        self, verification_code: str
    ) -> ValidationResult:
        if not verification_code:
            return ValidationResult.INVALID_VERIFICATION_CODE

        try:
            await self._api.async_validate_verification_code(
                self._authentication.access_token,
                self._login_method,
                self._username,
                verification_code,
            )
        except (AugustApiAIOHTTPError, ClientError):
            return ValidationResult.INVALID_VERIFICATION_CODE

        return ValidationResult.VALIDATED

    async def async_send_verification_code(self) -> bool:
        await self._api.async_send_verification_code(
            self._authentication.access_token, self._login_method, self._username
        )

        return True

    async def async_refresh_access_token(self, force=False) -> Authentication | None:
        if not self.should_refresh() and not force:
            return self._authentication

        if self._authentication.state is not AuthenticationState.AUTHENTICATED:
            _LOGGER.warning("Tried to refresh access token when not authenticated")
            return self._authentication

        refreshed_token = await self._api.async_refresh_access_token(
            self._authentication.access_token
        )

        authentication = self._process_refreshed_access_token(refreshed_token)
        await self._async_cache_authentication(authentication)
        return authentication

    async def _async_cache_authentication(self, authentication: Authentication) -> None:
        if self._access_token_cache_file is not None:
            async with aiofiles.open(self._access_token_cache_file, "w") as file:
                await file.write(to_authentication_json(authentication))