File: auth_v1.py

package info (click to toggle)
python-duo-client 5.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 768 kB
  • sloc: python: 7,105; sh: 6; makefile: 4
file content (129 lines) | stat: -rw-r--r-- 3,769 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
Duo Security Auth API v1 reference client implementation.

<http://www.duosecurity.com/docs/authapi-v1>
"""
from . import client


FACTOR_AUTO = 'auto'
FACTOR_PASSCODE = 'passcode'
FACTOR_PHONE = 'phone'
FACTOR_SMS = 'sms'
FACTOR_PUSH = 'push'

PHONE1 = 'phone1'
PHONE2 = 'phone2'
PHONE3 = 'phone3'
PHONE4 = 'phone4'
PHONE5 = 'phone5'


class AuthV1(client.Client):
    sig_version = 2
    auth_details = False

    def ping(self):
        """
        Returns True if and only if the Duo service is up and responding.
        """
        response = self.json_api_call('GET', '/rest/v1/ping', {})
        return response == 'pong'

    def check(self):
        """
        Returns True if and only if the integration key, secret key, and
        signature generation are valid.
        """
        response = self.json_api_call('GET', '/rest/v1/check', {})
        return response == 'valid'

    def logo(self):
        """
        Retrieve the user-supplied logo.

        Returns the logo on success, raises RuntimeError on failure.
        """
        response, data = self.api_call('GET', '/rest/v1/logo', {})
        content_type = response.getheader('Content-Type')
        if content_type and content_type.startswith('image/'):
            return data
        else:
            return self.parse_json_response(response, data)

    def preauth(self, username,
                ipaddr=None):
        params = {
            'user': username,
        }
        if ipaddr is not None:
            params['ipaddr'] = ipaddr
        response = self.json_api_call('POST', '/rest/v1/preauth', params)
        return response

    def auth(self, username,
             factor=FACTOR_PHONE,
             auto=None,
             passcode=None,
             phone=PHONE1,
             pushinfo=None,
             ipaddr=None,
             async_txn=False):
        """
        Returns True if authentication was a success, else False.

        If 'async_txn' is True, returns txid of the authentication transaction.
        """
        params = {
            'user': username,
            'factor': factor,
        }
        if async_txn:
            params['async'] = '1'
        if pushinfo is not None:
            params['pushinfo'] = pushinfo
        if ipaddr is not None:
            params['ipaddr'] = ipaddr

        if factor == FACTOR_AUTO:
            params['auto'] = auto
        elif factor == FACTOR_PASSCODE:
            params['code'] = passcode
        elif factor == FACTOR_PHONE:
            params['phone'] = phone
        elif factor == FACTOR_SMS:
            params['phone'] = phone
        elif factor == FACTOR_PUSH:
            params['phone'] = phone

        response = self.json_api_call('POST', '/rest/v1/auth', params)
        if self.auth_details:
            return response
        if async_txn:
            return response['txid']
        return response['result'] == 'allow'

    def status(self, txid):
        """
        Returns a 3-tuple:
            (complete, success, description)

            complete - True if the authentication request has
                       completed, else False.
            success - True if the authentication request has
                      completed and was a success, else False.
            description - A string describing the current status of the
                          authentication request.
        """
        params = {
            'txid': txid,
        }
        response = self.json_api_call('GET', '/rest/v1/status', params)
        complete = False
        success = False
        if 'result' in response:
            complete = True
            success = response['result'] == 'allow'
        description = response['status']

        return (complete, success, description)