File: auth.py

package info (click to toggle)
python-duo-client 5.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 768 kB
  • sloc: python: 7,105; sh: 6; makefile: 4
file content (207 lines) | stat: -rw-r--r-- 6,993 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""
Duo Security Auth API reference client implementation.

<http://www.duosecurity.com/docs/authapi>
"""
from . import client

class Auth(client.Client):
    def ping(self):
        """
        Determine if the Duo service is up and responding.

        Returns information about the Duo service state: {
            'time': <int:UNIX timestamp>,
        }
        """
        return self.json_api_call('GET', '/auth/v2/ping', {})

    def check(self):
        """
        Determine if the integration key, secret key, and signature
        generation are valid.

        Returns information about the Duo service state: {
            'time': <int:UNIX timestamp>,
        }
        """
        return self.json_api_call('GET', '/auth/v2/check', {})

    def logo(self):
        """
        Retrieve the user-supplied logo.

        Returns the logo on success, raises RuntimeError on failure.
        """
        response, data = self.api_call('GET', '/auth/v2/logo', {})
        content_type = response.getheader('Content-Type')
        if content_type and content_type.startswith('image/'):
            return data
        else:
            return self.parse_json_response(response, data)

    def enroll(self, username=None, valid_secs=None, bypass_codes=None):
        """
        Create a new user and associated numberless phone.

        Returns activation information: {
            'activation_barcode': <str:url>,
            'activation_code': <str:actcode>,
            'bypass_codes': <list[str:autogenerated]:optional>,
            'user_id': <str:autogenerated>,
            'username': <str:provided or autogenerated>,
            'valid_secs': <int:seconds>,
        }
        """
        params = {}
        if username is not None:
            params['username'] = username
        if valid_secs is not None:
            valid_secs = str(int(valid_secs))
            params['valid_secs'] = valid_secs
        if bypass_codes is not None:
            bypass_codes = str(int(bypass_codes))
            params['bypass_codes'] = bypass_codes
        return self.json_api_call('POST',
                                  '/auth/v2/enroll',
                                  params)

    def enroll_status(self, user_id, activation_code):
        """
        Check if a user has been enrolled yet.

        Returns a string constant indicating whether the user has been
        enrolled or the code remains unclaimed.
        """
        params = {
            'user_id': user_id,
            'activation_code': activation_code,
        }
        response = self.json_api_call('POST',
                                      '/auth/v2/enroll_status',
                                      params)
        return response

    def preauth(self,
                username=None,
                user_id=None,
                ipaddr=None,
                client_supports_verified_push=None,
                trusted_device_token=None):
        """
        Determine if and with what factors a user may authenticate or enroll.

        See the adminapi docs for parameter and response information.
        """
        params = {}
        if username is not None:
            params['username'] = username
        if user_id is not None:
            params['user_id'] = user_id
        if ipaddr is not None:
            params['ipaddr'] = ipaddr
        if client_supports_verified_push is not None:
            params['client_supports_verified_push'] = client_supports_verified_push
        if trusted_device_token is not None:
            params['trusted_device_token'] = trusted_device_token
        response = self.json_api_call('POST',
                                      '/auth/v2/preauth',
                                      params)
        return response

    def auth(self,
             factor,
             username=None,
             user_id=None,
             ipaddr=None,
             async_txn=False,
             type=None,
             display_username=None,
             pushinfo=None,
             device=None,
             passcode=None,
             txid=None):
        """
        Perform second-factor authentication for a user.

        If async_txn is True, returns: {
            'txid': <str: transaction ID for use with auth_status>,
        }

        Otherwise, returns: {
            'result': <str:allow|deny>,
            'status': <str:machine-parsable>,
            'status_msg': <str:human-readable>,
        }

        If Trusted Devices is enabled, async_txn is not True, and status is
        'allow', another item is returned:

        * trusted_device_token: <str: device token for use with preauth>
        """
        params = {
            'factor': factor,
            'async': str(int(async_txn)),
        }
        if username is not None:
            params['username'] = username
        if user_id is not None:
            params['user_id'] = user_id
        if ipaddr is not None:
            params['ipaddr'] = ipaddr
        if type is not None:
            params['type'] = type
        if display_username is not None:
            params['display_username'] = display_username
        if pushinfo is not None:
            params['pushinfo'] = pushinfo
        if device is not None:
            params['device'] = device
        if passcode is not None:
            params['passcode'] = passcode
        if txid is not None:
            params['txid'] = txid
        response = self.json_api_call('POST',
                                      '/auth/v2/auth',
                                      params)
        return response

    def auth_status(self, txid):
        """
        Longpoll for the status of an asynchronous authentication call.

        Returns a dict with four items:

        * waiting: True if the authentication attempt is still in progress
          and the caller can continue to poll, else False.

        * success: True if the authentication request has completed and
          was a success, else False.

        * status: String constant identifying the request's state.

        * status_msg: Human-readable string describing the request state.

        If Trusted Devices is enabled, another item is returned when success
        is True:

        * trusted_device_token: String token to bypass second-factor
          authentication for this user during an admin-defined period.
        """
        params = {
            'txid': txid,
        }
        status = self.json_api_call('GET',
                                    '/auth/v2/auth_status',
                                    params)
        response = {
            'waiting': (status.get('result') == 'waiting'),
            'success': (status.get('result') == 'allow'),
            'status': status.get('status', ''),
            'status_msg': status.get('status_msg', ''),
        }

        if 'trusted_device_token' in status:
            response['trusted_device_token'] = status['trusted_device_token']

        return response