File: PyViCareBrowserOAuthManager.py

package info (click to toggle)
pyvicare 2.58.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,944 kB
  • sloc: python: 6,182; sh: 5; makefile: 2
file content (113 lines) | stat: -rw-r--r-- 3,951 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import json
import logging
import os
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer

from authlib.common.security import generate_token
from authlib.integrations.requests_client import OAuth2Session

from PyViCare.PyViCareAbstractOAuthManager import (
    AUTHORIZE_URL,
    SCOPE_IOT,
    SCOPE_OFFLINE_ACCESS,
    SCOPE_USER,
    TOKEN_URL,
    AbstractViCareOAuthManager,
)
from PyViCare.PyViCareUtils import (PyViCareBrowserOAuthTimeoutReachedError,
                                    PyViCareInvalidCredentialsError)

logger = logging.getLogger('ViCare')
logger.addHandler(logging.NullHandler())

REDIRECT_PORT = 51125
AUTH_TIMEOUT = 60 * 3


class ViCareBrowserOAuthManager(AbstractViCareOAuthManager):
    class Serv(BaseHTTPRequestHandler):
        def __init__(self, callback, *args):
            self.callback = callback
            BaseHTTPRequestHandler.__init__(self, *args)

        def do_GET(self):
            (status_code, text) = self.callback(self.path)
            self.send_response(status_code)
            self.send_header("Content-type", "text/plain")
            self.end_headers()
            self.wfile.write(text.encode("utf-8"))

    def __init__(self, client_id: str, token_file: str) -> None:

        self.token_file = token_file
        self.client_id = client_id
        oauth_session = self.__load_or_create_new_session()
        super().__init__(oauth_session)

    def __load_or_create_new_session(self):
        restore_oauth = self.__restoreToken()
        if restore_oauth is not None:
            return restore_oauth
        return self.__execute_browser_authentication()

    def __execute_browser_authentication(self):
        redirect_uri = f"http://localhost:{REDIRECT_PORT}"
        oauth_session = OAuth2Session(
            self.client_id, redirect_uri=redirect_uri, scope=[SCOPE_IOT, SCOPE_USER, SCOPE_OFFLINE_ACCESS], code_challenge_method='S256')
        code_verifier = generate_token(48)
        authorization_url, _ = oauth_session.create_authorization_url(AUTHORIZE_URL, code_verifier=code_verifier)

        webbrowser.open(authorization_url)

        location = None

        def callback(path):
            nonlocal location
            location = path
            return (200, "Success. You can close this browser window now.")

        def handlerWithCallbackWrapper(*args):
            ViCareBrowserOAuthManager.Serv(callback, *args)

        server = HTTPServer(('localhost', REDIRECT_PORT),
                            handlerWithCallbackWrapper)
        server.timeout = AUTH_TIMEOUT
        server.handle_request()

        if location is None:
            logger.debug("Timeout reached")
            raise PyViCareBrowserOAuthTimeoutReachedError()

        logger.debug("Location: %s", location)

        oauth_session.fetch_token(TOKEN_URL, authorization_response=location, code_verifier=code_verifier)

        if oauth_session.token is None:
            raise PyViCareInvalidCredentialsError()

        logger.debug("Token received: %s", oauth_session.token)
        self.__storeToken(oauth_session.token)
        logger.info("New token created")
        return oauth_session

    def __storeToken(self, token):
        if self.token_file is None:
            return

        with open(self.token_file, mode='w') as json_file:
            json.dump(token, json_file)
            logger.info("Token stored to file")

    def __restoreToken(self):
        if self.token_file is None or not os.path.isfile(self.token_file):
            return None

        with open(self.token_file, mode='r') as json_file:
            token = json.load(json_file)
            logger.info("Token restored from file")
            return OAuth2Session(self.client_id, token=token)

    def renewToken(self) -> None:  # type: ignore
        refresh_token = self.oauth_session.refresh_token
        self.oauth_session.refresh_token(TOKEN_URL, refresh_token=refresh_token)