File: website_sample.py

package info (click to toggle)
python-adal 1.2.7-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,344 kB
  • sloc: xml: 12,790; python: 4,656; makefile: 14
file content (141 lines) | stat: -rw-r--r-- 5,413 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
try:
    from http import server as httpserver
    from http import cookies as Cookie
except ImportError:
    import SimpleHTTPServer as httpserver
    import Cookie as Cookie

try:
    import socketserver
except ImportError:
    import SocketServer as socketserver

try:
    from urllib.parse import urlparse, parse_qs
except ImportError:
    from urlparse import urlparse, parse_qs

import json
import os
import random
import string
import sys

from adal import AuthenticationContext

# You can provide account information by using a JSON file. Either
# through a command line argument, 'python sample.py parameters.json', or
# specifying in an environment variable of ADAL_SAMPLE_PARAMETERS_FILE.
#
# The information inside such file can be obtained via app registration.
# See https://github.com/AzureAD/azure-activedirectory-library-for-python/wiki/Register-your-application-with-Azure-Active-Directory
#
# {
#    "resource": "your_resource",
#    "tenant" : "rrandallaad1.onmicrosoft.com",
#    "authorityHostUrl" : "https://login.microsoftonline.com",
#    "clientId" : "624ac9bd-4c1c-4687-aec8-b56a8991cfb3",
#    "clientSecret" : "verySecret=""
# }

parameters_file = (sys.argv[1] if len(sys.argv) == 2 else
                   os.environ.get('ADAL_SAMPLE_PARAMETERS_FILE'))

if parameters_file:
    with open(parameters_file, 'r') as f:
        parameters = f.read()
    sample_parameters = json.loads(parameters)
else:
    raise ValueError('Please provide parameter file with account information.')

PORT = 8088
TEMPLATE_AUTHZ_URL = ('https://login.microsoftonline.com/{}/oauth2/authorize?'+
                      'response_type=code&client_id={}&redirect_uri={}&'+
                      'state={}&resource={}')
GRAPH_RESOURCE = '00000002-0000-0000-c000-000000000000'
RESOURCE = sample_parameters.get('resource', GRAPH_RESOURCE)
REDIRECT_URI = 'http://localhost:{}/getAToken'.format(PORT)

authority_url = (sample_parameters['authorityHostUrl'] + '/' +
                 sample_parameters['tenant'])

class OAuth2RequestHandler(httpserver.SimpleHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/':
            self.send_response(307)
            login_url = 'http://localhost:{}/login'.format(PORT)
            self.send_header('Location', login_url)
            self.end_headers()
        elif self.path == '/login':
            auth_state = (''.join(random.SystemRandom()
                                  .choice(string.ascii_uppercase + string.digits)
                                  for _ in range(48)))
            cookie = Cookie.SimpleCookie()
            cookie['auth_state'] = auth_state
            authorization_url = TEMPLATE_AUTHZ_URL.format(
                sample_parameters['tenant'],
                sample_parameters['clientId'],
                REDIRECT_URI,
                auth_state,
                RESOURCE)
            self.send_response(307)
            self.send_header('Set-Cookie', cookie.output(header=''))
            self.send_header('Location', authorization_url)
            self.end_headers()
        elif self.path.startswith('/getAToken'):
            is_ok = True
            try:
                token_response = self._acquire_token()
                message = 'response: ' + json.dumps(token_response)
                #Later, if the access token is expired it can be refreshed.
                auth_context = AuthenticationContext(authority_url)
                token_response = auth_context.acquire_token_with_refresh_token(
                    token_response['refreshToken'],
                    sample_parameters['clientId'],
                    RESOURCE,
                    sample_parameters['clientSecret'])
                message = (message + '*** And here is the refresh response:' +
                           json.dumps(token_response))
            except ValueError as exp:
                message = str(exp)
                is_ok = False
            self._send_response(message, is_ok)

    def _acquire_token(self):
        parsed = urlparse(self.path)
        code = parse_qs(parsed.query)['code'][0]
        state = parse_qs(parsed.query)['state'][0]
        cookie = Cookie.SimpleCookie(self.headers["Cookie"])
        if state != cookie['auth_state'].value:
            raise ValueError('state does not match')
        ### Main logic begins
        auth_context = AuthenticationContext(authority_url)
        return auth_context.acquire_token_with_authorization_code(
            code,
            REDIRECT_URI,
            RESOURCE,
            sample_parameters['clientId'],
            sample_parameters['clientSecret'])
        ### Main logic ends

    def _send_response(self, message, is_ok=True):
        self.send_response(200 if is_ok else 400)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        if is_ok:
            #todo, pretty format token response in json
            message_template = ('<html><head><title>Succeeded</title></head>'
                                '<body><p>{}</p></body></html>')
        else:
            message_template = ('<html><head><title>Failed</title></head>'
                                '<body><p>{}</p></body></html>')

        output = message_template.format(message)
        self.wfile.write(output.encode())

httpd = socketserver.TCPServer(('', PORT), OAuth2RequestHandler)

print('serving at port', PORT)
httpd.serve_forever()