File: userpassauth.py

package info (click to toggle)
python-openshift 0.13.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 332 kB
  • sloc: python: 1,824; makefile: 20; sh: 14
file content (129 lines) | stat: -rw-r--r-- 5,736 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# OpenShift User-Password Login helper module
# This module extends the `kubernetes.client.Configuration` object.
# `OCPLoginConfiguration` uses `username` and `password` params to authenticate
# with the OAuth OpenShift component, after the autentication the `api_key` value is set
# with the Bearer token.
#
# IMPORTANT: the Bearer token is designed to expire, si up to the user to renew the token.
# the valitity (in secods) is saved into the `token['expires_in`]` attribute.
# The value refers to when the token has been created and to not change overtime.

# Related discussion on GitHub https://github.com/openshift/openshift-restclient-python/issues/249
# Most part of the code has been taken from the k8s_auth ansible module:
# https://github.com/ansible/ansible/blob/stable-2.9/lib/ansible/modules/clustering/k8s/k8s_auth.py

import requests
from requests_oauthlib import OAuth2Session
from urllib3.util import make_headers
from urllib.parse import parse_qs, urlencode, urlparse
from kubernetes.client import Configuration as KubeConfig

class OCPLoginException(Exception):
    """The base class for the OCPLogin exceptions"""

class OCPLoginRequestException(OCPLoginException):
    def __init__(self, msg, **kwargs):
        self.msg = msg
        self.req_info = {}
        for k, v in kwargs.items():
            self.req_info['req_' + k] = v

    def __str__(self):
        error_msg = self.msg
        for k, v in self.req_info.items():
            error_msg += '\t{0}: {1}\n'.format(k, v)
        return error_msg

class OCPLoginConfiguration(KubeConfig):
    def __init__(self, host="http://localhost",
                 api_key=None, api_key_prefix=None,
                 ocp_username=None, ocp_password=None,
                 discard_unknown_keys=False,
                 ):

        self.ocp_username = ocp_username
        self.ocp_password = ocp_password

        super(OCPLoginConfiguration, self).__init__(host=host, api_key=None,
                api_key_prefix=None, username=None,
                password=None, discard_unknown_keys=discard_unknown_keys)

    def get_token(self):
        # python-requests takes either a bool or a path to a ca file as the 'verify' param
        if self.verify_ssl and self.ssl_ca_cert:
            self.con_verify_ca = self.ssl_ca_cert  # path
        else:
            self.con_verify_ca = self.verify_ssl   # bool

        self.discover()
        self.token = self.login()
        self.api_key = {"authorization": "Bearer " + self.token['access_token']}
        self.api_key_expires = self.token['expires_in']
        self.api_key_scope = self.token['scope']

    def discover(self):
        url = '{0}/.well-known/oauth-authorization-server'.format(self.host)
        ret = requests.get(url, verify=self.con_verify_ca)

        if ret.status_code != 200:
            raise OCPLoginRequestException("Couldn't find OpenShift's OAuth API", method='GET', url=url,
                              reason=ret.reason, status_code=ret.status_code)

        oauth_info = ret.json()
        self.openshift_auth_endpoint = oauth_info['authorization_endpoint']
        self.openshift_token_endpoint = oauth_info['token_endpoint']

    def login(self):
        os_oauth = OAuth2Session(client_id='openshift-challenging-client')
        authorization_url, state = os_oauth.authorization_url(self.openshift_auth_endpoint,
                                                              state="1", code_challenge_method='S256')
        auth_headers = make_headers(basic_auth='{0}:{1}'.format(self.ocp_username, self.ocp_password))
        # Request authorization code using basic auth credentials
        ret = os_oauth.get(
            authorization_url,
            headers={'X-Csrf-Token': state, 'authorization': auth_headers.get('authorization')},
            verify=self.con_verify_ca,
            allow_redirects=False
        )

        if ret.status_code != 302:
            raise OCPLoginRequestException("Authorization failed.", method='GET', url=authorization_url,
                              reason=ret.reason, status_code=ret.status_code)

        qwargs = {}
        for k, v in parse_qs(urlparse(ret.headers['Location']).query).items():
            qwargs[k] = v[0]
        qwargs['grant_type'] = 'authorization_code'

        # Using authorization code given to us in the Location header of the previous request, request a token
        ret = os_oauth.post(
            self.openshift_token_endpoint,
            headers={
                'Accept': 'application/json',
                'Content-Type': 'application/x-www-form-urlencoded',
                # This is just base64 encoded 'openshift-challenging-client:'
                'Authorization': 'Basic b3BlbnNoaWZ0LWNoYWxsZW5naW5nLWNsaWVudDo='
            },

            data=urlencode(qwargs),
            verify=self.con_verify_ca
        )
        if ret.status_code != 200:
            raise OCPLoginRequestException("Failed to obtain an authorization token.", method='POST',
                              url=self.openshift_token_endpoint,
                              reason=ret.reason, status_code=ret.status_code)
        return ret.json()

    def logout(self):
        url = '{0}/apis/oauth.openshift.io/v1/oauthaccesstokens/{1}'.format(self.host, self.api_key)
        headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'Authorization': 'Bearer {0}'.format(self.api_key)
        }
        json = {
            "apiVersion": "oauth.openshift.io/v1",
            "kind": "DeleteOptions"
        }
        requests.delete(url, headers=headers, json=json, verify=self.con_verify_ca)
        # Ignore errors, the token will time out eventually anyway