1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta, timezone
import aiofiles
from aiohttp import ClientError
from .api_async import ApiAsync
from .authenticator_common import (
Authentication,
AuthenticationState,
AuthenticatorCommon,
ValidationResult,
from_authentication_json,
to_authentication_json,
)
from .exceptions import AugustApiAIOHTTPError
_LOGGER = logging.getLogger(__name__)
class AuthenticatorAsync(AuthenticatorCommon):
"""Class to manage authentication with the August API."""
_api: ApiAsync
async def _read_access_token_file(
self,
access_token_cache_file: str,
file: aiofiles.threadpool.binary.AsyncBufferedIOBase,
) -> None:
contents = await file.read()
self._authentication = from_authentication_json(json.loads(contents))
# If token is to expire within 7 days then print a warning.
if self._authentication.is_expired():
_LOGGER.error("Token has expired.")
self._authentication = Authentication(
AuthenticationState.REQUIRES_AUTHENTICATION,
install_id=self._install_id,
)
# If token is not expired but less then 7 days before it
# will.
elif (
self._authentication.parsed_expiration_time() - datetime.now(timezone.utc)
) < timedelta(days=7):
exp_time = self._authentication.access_token_expires
_LOGGER.warning(
"API Token is going to expire at %s "
"hours. Deleting file %s will result "
"in a new token being requested next"
" time",
exp_time,
access_token_cache_file,
)
async def async_setup_authentication(self) -> None:
if access_token_cache_file := self._access_token_cache_file:
try:
async with aiofiles.open(access_token_cache_file) as file:
await self._read_access_token_file(access_token_cache_file, file)
return
except FileNotFoundError:
_LOGGER.debug("Cache file not found: %s", access_token_cache_file)
except json.decoder.JSONDecodeError as error:
_LOGGER.error(
"Unable to read cache file (%s): %s",
access_token_cache_file,
error,
)
self._authentication = Authentication(
AuthenticationState.REQUIRES_AUTHENTICATION, install_id=self._install_id
)
async def async_authenticate(self) -> Authentication:
if self._api.brand_config.require_oauth:
raise RuntimeError(f"OAuth is required for brand {self._api.brand}")
if self._authentication.state is AuthenticationState.AUTHENTICATED:
return self._authentication
identifier = self._login_method + ":" + self._username
install_id = self._authentication.install_id
response = await self._api.async_get_session(
install_id, identifier, self._password
)
json_dict = await response.json()
authentication = self._authentication_from_session_response(
install_id, response.headers, json_dict
)
if authentication.state is AuthenticationState.AUTHENTICATED:
await self._async_cache_authentication(authentication)
return authentication
async def async_validate_verification_code(
self, verification_code: str
) -> ValidationResult:
if not verification_code:
return ValidationResult.INVALID_VERIFICATION_CODE
try:
await self._api.async_validate_verification_code(
self._authentication.access_token,
self._login_method,
self._username,
verification_code,
)
except (AugustApiAIOHTTPError, ClientError):
return ValidationResult.INVALID_VERIFICATION_CODE
return ValidationResult.VALIDATED
async def async_send_verification_code(self) -> bool:
await self._api.async_send_verification_code(
self._authentication.access_token, self._login_method, self._username
)
return True
async def async_refresh_access_token(self, force=False) -> Authentication | None:
if not self.should_refresh() and not force:
return self._authentication
if self._authentication.state is not AuthenticationState.AUTHENTICATED:
_LOGGER.warning("Tried to refresh access token when not authenticated")
return self._authentication
refreshed_token = await self._api.async_refresh_access_token(
self._authentication.access_token
)
authentication = self._process_refreshed_access_token(refreshed_token)
await self._async_cache_authentication(authentication)
return authentication
async def _async_cache_authentication(self, authentication: Authentication) -> None:
if self._access_token_cache_file is not None:
async with aiofiles.open(self._access_token_cache_file, "w") as file:
await file.write(to_authentication_json(authentication))
|