File: token_auth.py

package info (click to toggle)
python-wolf-comm 0.0.47-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,204 kB
  • sloc: python: 1,008; makefile: 3
file content (120 lines) | stat: -rw-r--r-- 4,911 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import datetime
import logging

from httpx import AsyncClient

from wolf_comm import constants

from lxml import html
import pkce
import shortuuid



_LOGGER = logging.getLogger(__name__)


class Tokens:
    """Has only one token: access"""

    def __init__(self, access_token: str, expires_in: int):
        self.access_token = access_token
        self.expire_date = datetime.datetime.now() + datetime.timedelta(seconds=expires_in)

    def is_expired(self) -> bool:
        return self.expire_date < datetime.datetime.now()


class TokenAuth:
    """Adds poosibility to login with passed credentials"""

    def __init__(self, username: str, password: str):
        self.username = username
        self.password = password

    async def token(self, client: AsyncClient) -> Tokens:
        try:
            # Generate client-sided variables for OpenID
            code_verifier, code_challenge = pkce.generate_pkce_pair()
            state = shortuuid.uuid()
        

            # Retrieve verification token from WOLF website
            r = await client.get(constants.AUTHENTICATION_BASE_URL + '/Account/Login?ReturnUrl=/idsrv/connect/authorize/callback?client_id={}&redirect_uri={}/signin-callback.html&response_type=code&scope=openid%2520profile api role&state={}&code_challenge={}&code_challenge_method=S256&response_mode=query&lang=de-DE'.format(constants.AUTHENTICATION_CLIENT, constants.BASE_URL,state, code_challenge))

            _LOGGER.debug('Verification code response: %s', r.content)

            tree = html.document_fromstring(r.text)
            elements = tree.xpath('//form/input/@value')

            if elements:

                _LOGGER.debug('Verification token: %s', elements[0])

                verification_token = elements[0] # __RequestVerificationToken

                # Get code
                login_data = {
                    "Input.Username": self.username,
                    "Input.Password": self.password,
                    "__RequestVerificationToken": verification_token
                }

                r = await client.post(
                    constants.AUTHENTICATION_BASE_URL + "/Account/Login",
                    params={
                        "ReturnUrl": constants.AUTHENTICATION_URL + "/connect/authorize/callback?client_id={}&redirect_uri={}/signin-callback.html&response_type=code&scope=openid profile api role&state={}&code_challenge={}&code_challenge_method=S256&response_mode=query&lang=de-DE".format(constants.AUTHENTICATION_CLIENT, constants.BASE_URL, state,code_challenge)
                    },
                    headers={
                        "Sec-Fetch-Dest": "document",
                        "Sec-Fetch-Mode": "navigate",
                    },
                    data=login_data,
                    cookies = r.cookies,
                    follow_redirects=True
                )
                
                _LOGGER.debug('Code response: %s', r.content)
                code = r.url.params['code']
                

                headers = {
                    "Cache-control": "no-cache",
                    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:108.0) Gecko/20100101 Firefox/108.0",
                    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
                    "Accept-Language": "de-DE,de;q=0.8,en-US;q=0.5,en;q=0.3",
                    "Referer": constants.BASE_URL + "/",
                    "Sec-Fetch-Dest": "document",
                    "Sec-Fetch-Mode": "navigate",
                    "Sec-Fetch-Site": "same-origin",
                    "TE": "trailers"
                }

                # Get token
                r = await client.post(
                    constants.AUTHENTICATION_BASE_URL + "/connect/token",
                    headers=headers,
                    data={
                        "client_id": "smartset.web",
                        "code": code,
                        "redirect_uri": constants.BASE_URL + "/signin-callback.html",
                        "code_verifier": code_verifier,
                        "grant_type": "authorization_code",
                    },
                )
                        
                json = r.json()
                _LOGGER.debug('Token response: %s', json)
                if "error" in json:
                    raise InvalidAuth
                _LOGGER.info('Successfully authenticated')
                return Tokens(json.get("access_token"), json.get("expires_in"))
            else:
                raise InvalidAuth
        except Exception as e:
            _LOGGER.error('An error occurred: %s', e)
            raise InvalidAuth

class InvalidAuth(Exception):
    """Please check whether you entered an invalid username or password. If everything looks fine then probably there is an issue with Wolf SmartSet servers."""
    pass