File: PyViCareOAuthManager.py

package info (click to toggle)
pyvicare 2.56.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,580 kB
  • sloc: python: 4,977; sh: 5; makefile: 2
file content (114 lines) | stat: -rw-r--r-- 4,418 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import logging
import os
import pickle
from contextlib import suppress
from pickle import UnpicklingError

import requests
from authlib.common.security import generate_token
from authlib.integrations.requests_client import OAuth2Session

from PyViCare.PyViCareAbstractOAuthManager import AbstractViCareOAuthManager
from PyViCare.PyViCareUtils import (PyViCareInvalidConfigurationError,
                                    PyViCareInvalidCredentialsError)

logger = logging.getLogger('ViCare')
logger.addHandler(logging.NullHandler())

AUTHORIZE_URL = 'https://iam.viessmann-climatesolutions.com/idp/v3/authorize'
TOKEN_URL = 'https://iam.viessmann-climatesolutions.com/idp/v3/token'
REDIRECT_URI = "vicare://oauth-callback/everest"
VIESSMANN_SCOPE = ["IoT User"]


class ViCareOAuthManager(AbstractViCareOAuthManager):
    def __init__(self, username, password, client_id, token_file):
        self.username = username
        self.password = password
        self.token_file = token_file
        self.client_id = client_id
        oauth_session = self.__restore_oauth_session_from_token(token_file)
        super().__init__(oauth_session)

    def __restore_oauth_session_from_token(self, token_file):
        existing_token = self.__deserialize_token(token_file)
        if existing_token is not None:
            return OAuth2Session(self.client_id, token=existing_token)

        return self.__create_new_session(self.username, self.password, token_file)

    def __create_new_session(self, username, password, token_file=None):
        """Create a new oAuth2 sessions
        Viessmann tokens expire after 3600s (60min)
        Parameters
        ----------
        username : str
            e-mail address
        password : str
            password
        token_file: str
            path to serialize the token (will restore if already existing). No serialisation if not present

        Returns
        -------
        oauth:
            oauth sessions object
        """
        oauth_session = OAuth2Session(
            self.client_id, redirect_uri=REDIRECT_URI, scope=VIESSMANN_SCOPE, code_challenge_method='S256')
        code_verifier = generate_token(48)
        authorization_url, _ = oauth_session.create_authorization_url(AUTHORIZE_URL, code_verifier=code_verifier)
        logger.debug("Auth URL is: %s", authorization_url)

        header = {'Content-Type': 'application/x-www-form-urlencoded'}
        response = requests.post(
            authorization_url, headers=header, auth=(username, password), allow_redirects=False)

        if response.status_code == 401:
            raise PyViCareInvalidConfigurationError(response.json())

        if 'Location' not in response.headers:
            logger.debug('Response: %s', response)
            raise PyViCareInvalidCredentialsError()

        oauth_session.fetch_token(TOKEN_URL, authorization_response=response.headers['Location'], code_verifier=code_verifier)

        if oauth_session.token is None:
            raise PyViCareInvalidCredentialsError()

        logger.debug("Token received: %s",oauth_session.token)
        self.__serialize_token(oauth_session.token, token_file)
        logger.info("New token created")
        return oauth_session

    def renewToken(self):
        logger.info("Token expired, renewing")
        self.replace_session(self.__create_new_session(
            self.username, self.password, self.token_file))
        logger.info("Token renewed successfully")

    def __serialize_token(self, oauth, token_file):
        logger.debug("Start serial")
        if token_file is None:
            logger.debug("Skip serial, no file provided.")
            return

        with open(token_file, mode='wb') as binary_file:
            pickle.dump(oauth, binary_file)

        logger.info("Token serialized to %s", token_file)

    def __deserialize_token(self, token_file):
        if token_file is None or not os.path.isfile(token_file):
            logger.debug(
                "Token file argument not provided or file does not exist")
            return None

        logger.info("Token file exists")
        with suppress(UnpicklingError):
            with open(token_file, mode='rb') as binary_file:
                s_token = pickle.load(binary_file)
                logger.info("Token restored from file")
                return s_token
        logger.warning("Could not restore token")
        return None