1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
import datetime
import logging
from httpx import AsyncClient
from wolf_comm import constants
from lxml import html
import pkce
import shortuuid
_LOGGER = logging.getLogger(__name__)
class Tokens:
"""Has only one token: access"""
def __init__(self, access_token: str, expires_in: int):
self.access_token = access_token
self.expire_date = datetime.datetime.now() + datetime.timedelta(seconds=expires_in)
def is_expired(self) -> bool:
return self.expire_date < datetime.datetime.now()
class TokenAuth:
"""Adds poosibility to login with passed credentials"""
def __init__(self, username: str, password: str):
self.username = username
self.password = password
async def token(self, client: AsyncClient) -> Tokens:
try:
# Generate client-sided variables for OpenID
code_verifier, code_challenge = pkce.generate_pkce_pair()
state = shortuuid.uuid()
# Retrieve verification token from WOLF website
r = await client.get(constants.AUTHENTICATION_BASE_URL + '/Account/Login?ReturnUrl=/idsrv/connect/authorize/callback?client_id={}&redirect_uri={}/signin-callback.html&response_type=code&scope=openid%2520profile api role&state={}&code_challenge={}&code_challenge_method=S256&response_mode=query&lang=de-DE'.format(constants.AUTHENTICATION_CLIENT, constants.BASE_URL,state, code_challenge))
_LOGGER.debug('Verification code response: %s', r.content)
tree = html.document_fromstring(r.text)
elements = tree.xpath('//form/input/@value')
if elements:
_LOGGER.debug('Verification token: %s', elements[0])
verification_token = elements[0] # __RequestVerificationToken
# Get code
login_data = {
"Input.Username": self.username,
"Input.Password": self.password,
"__RequestVerificationToken": verification_token
}
r = await client.post(
constants.AUTHENTICATION_BASE_URL + "/Account/Login",
params={
"ReturnUrl": constants.AUTHENTICATION_URL + "/connect/authorize/callback?client_id={}&redirect_uri={}/signin-callback.html&response_type=code&scope=openid profile api role&state={}&code_challenge={}&code_challenge_method=S256&response_mode=query&lang=de-DE".format(constants.AUTHENTICATION_CLIENT, constants.BASE_URL, state,code_challenge)
},
headers={
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
},
data=login_data,
cookies = r.cookies,
follow_redirects=True
)
_LOGGER.debug('Code response: %s', r.content)
code = r.url.params['code']
headers = {
"Cache-control": "no-cache",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:108.0) Gecko/20100101 Firefox/108.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "de-DE,de;q=0.8,en-US;q=0.5,en;q=0.3",
"Referer": constants.BASE_URL + "/",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"TE": "trailers"
}
# Get token
r = await client.post(
constants.AUTHENTICATION_BASE_URL + "/connect/token",
headers=headers,
data={
"client_id": "smartset.web",
"code": code,
"redirect_uri": constants.BASE_URL + "/signin-callback.html",
"code_verifier": code_verifier,
"grant_type": "authorization_code",
},
)
json = r.json()
_LOGGER.debug('Token response: %s', json)
if "error" in json:
raise InvalidAuth
_LOGGER.info('Successfully authenticated')
return Tokens(json.get("access_token"), json.get("expires_in"))
else:
raise InvalidAuth
except Exception as e:
_LOGGER.error('An error occurred: %s', e)
raise InvalidAuth
class InvalidAuth(Exception):
"""Please check whether you entered an invalid username or password. If everything looks fine then probably there is an issue with Wolf SmartSet servers."""
pass
|