1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
"""Authenticates to Flume API."""
from datetime import datetime, timedelta
import json
import jwt # install pyjwt
from requests import Session
from .constants import DEFAULT_TIMEOUT, URL_OAUTH_TOKEN # noqa: WPS300
from .utils import configure_logger, flume_response_error # noqa: WPS300
# Configure logging
LOGGER = configure_logger(__name__)
class FlumeAuth: # noqa: WPS214
"""Interact with API Authentication."""
def __init__( # noqa: WPS211
self,
username,
password,
client_id,
client_secret,
flume_token=None,
http_session=None,
timeout=DEFAULT_TIMEOUT,
):
"""
Initialize the data object.
Args:
username: Username to authenticate.
password: Password to authenticate.
client_id: API client id.
client_secret: API client secret.
flume_token: Pass flume token to variable.
http_session: Requests Session()
timeout: Requests timeout for throttling.
"""
self._creds = {
"client_id": client_id,
"client_secret": client_secret,
"username": username,
"password": password,
}
if http_session is None:
self._http_session = Session()
else:
self._http_session = http_session
self._timeout = timeout
self._token = None
self._decoded_token = None
self.user_id = None
self.authorization_header = None
self._load_token(flume_token)
self._verify_token()
@property
def token(self):
"""
Return authorization token for session.
Returns:
Returns the current JWT token.
"""
return self._token
def refresh_token(self):
"""Refresh authorization token for session."""
payload = {
"grant_type": "refresh_token",
"refresh_token": self._token["refresh_token"],
"client_id": self._creds["client_id"],
"client_secret": self._creds["client_secret"],
}
self._load_token(self._request_token(payload))
def retrieve_token(self):
"""Return authorization token for session."""
payload = dict({"grant_type": "password"}, **self._creds)
self._load_token(self._request_token(payload))
def _load_token(self, token):
"""
Update _token, decode token, user_id and auth header.
Args:
token: Authentication bearer token to be decoded.
"""
jwt_options = {"verify_signature": False}
self._token = token
try:
self._decoded_token = jwt.decode(
self._token["access_token"],
options=jwt_options,
)
except jwt.exceptions.DecodeError:
LOGGER.debug("Poorly formatted Access Token, fetching token using _creds")
self.retrieve_token()
except TypeError:
LOGGER.debug("Token TypeError, fetching token using _creds")
self.retrieve_token()
self.user_id = self._decoded_token["user_id"]
self.authorization_header = {
"authorization": "Bearer {0}".format(self._token.get("access_token")),
}
def _request_token(self, payload):
"""
Request Authorization Payload.
Args:
payload: Request payload to get token request.
Returns:
Return response Authentication Bearer token from request.
"""
headers = {"content-type": "application/json"}
response = self._http_session.request(
"POST",
URL_OAUTH_TOKEN,
json=payload,
headers=headers,
timeout=self._timeout,
)
LOGGER.debug("Token Payload: %s", payload) # noqa: WPS323
LOGGER.debug("Token Response: %s", response.text) # noqa: WPS323
# Check for response errors.
flume_response_error(
"Can't get token for user {0}".format(self._creds.get("username")),
response,
)
return json.loads(response.text)["data"][0]
def _verify_token(self):
"""Check to see if token is expiring in 12 hours."""
token_expiration = datetime.fromtimestamp(self._decoded_token["exp"])
time_difference = datetime.now() + timedelta(hours=12) # noqa: WPS432
LOGGER.debug("Token expiration time: %s", token_expiration) # noqa: WPS323
LOGGER.debug("Token comparison time: %s", time_difference) # noqa: WPS323
if token_expiration <= time_difference:
self.refresh_token()
|