File: res.py

package info (click to toggle)
tryton-modules-ldap-authentication 7.0.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 352 kB
  • sloc: python: 515; makefile: 11; xml: 8; sh: 3
file content (171 lines) | stat: -rw-r--r-- 6,276 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# This file is part of Tryton.  The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import logging
import ssl
import urllib.parse

import ldap3
from ldap3.core.exceptions import LDAPException

from trytond.config import config, parse_uri
from trytond.exceptions import LoginException
from trytond.i18n import gettext
from trytond.model.exceptions import AccessError
from trytond.pool import PoolMeta
from trytond.transaction import without_check_access

logger = logging.getLogger(__name__)
section = 'ldap_authentication'

# Old version of urlparse doesn't parse query for ldap
# see http://bugs.python.org/issue9374
if 'ldap' not in urllib.parse.uses_query:
    urllib.parse.uses_query.append('ldap')


def parse_ldap_url(uri):
    unquote = urllib.parse.unquote
    uri = parse_uri(uri)
    dn = unquote(uri.path)[1:]
    attributes, scope, filter_, extensions = (
        uri.query.split('?') + [''] * 4)[:4]
    if not scope:
        scope = 'base'
    extensions = urllib.parse.parse_qs(extensions)
    return (uri, dn, unquote(attributes), unquote(scope), unquote(filter_),
        extensions)


def ldap_server():
    uri = config.get(section, 'uri')
    if not uri:
        return
    uri, _, _, _, _, extensions = parse_ldap_url(uri)
    if uri.scheme.startswith('ldaps'):
        scheme, port = 'ldaps', 636
        tls = ldap3.Tls(validate=ssl.CERT_REQUIRED)
    else:
        scheme, port = 'ldap', 389
        tls = None
        if 'tls' in uri.scheme:
            tls = ldap3.Tls(validate=ssl.CERT_REQUIRED)
    return ldap3.Server('%s://%s:%s' % (
            scheme, uri.hostname, uri.port or port), tls=tls)


class User(metaclass=PoolMeta):
    __name__ = 'res.user'

    @staticmethod
    def ldap_search_user(login, server, attrs=None):
        '''
        Return the result of a ldap search for the login using the ldap
        server.
        The attributes values defined in attrs will be return.
        '''
        _, dn, _, scope, filter_, extensions = parse_ldap_url(
            config.get(section, 'uri'))
        scope = {
            'base': ldap3.BASE,
            'onelevel': ldap3.LEVEL,
            'one': ldap3.LEVEL,
            'subtree': ldap3.SUBTREE,
            'sub': ldap3.SUBTREE,
            }[scope]
        uid = config.get(section, 'uid', default='uid')
        if filter_:
            filter_ = '(&(%s=%s)%s)' % (uid, login, filter_)
        else:
            filter_ = '(%s=%s)' % (uid, login)

        bindpass = None
        bindname, = extensions.get('bindname', [None])
        if not bindname:
            bindname, = extensions.get('!bindname', [None])
        if bindname:
            # XXX find better way to get the password
            bindpass = config.get(section, 'bind_pass')

        bind_method = ldap3.AUTO_BIND_DEFAULT
        if server.ssl is False and server.tls is not None:
            bind_method = ldap3.AUTO_BIND_TLS_BEFORE_BIND

        with ldap3.Connection(
                server, bindname, bindpass, auto_bind=bind_method) as con:
            con.search(dn, filter_, search_scope=scope, attributes=attrs)
            result = con.entries
            if result and len(result) > 1:
                logger.info('ldap_search_user found more than 1 user')
            return [(e.entry_dn, e.entry_attributes_as_dict)
                for e in result]

    @classmethod
    @without_check_access
    def _check_passwd_ldap_user(cls, logins):
        find = False
        try:
            server = ldap_server()
            if not server:
                return
            for login in logins:
                if cls.ldap_search_user(login, server, attrs=[]):
                    find = True
                    break
        except LDAPException:
            logger.error('LDAPError when checking password', exc_info=True)
        if find:
            raise AccessError(
                gettext('ldap_authentication.msg_ldap_user_change_password',
                    user=login))

    @classmethod
    def create(cls, vlist):
        tocheck = []
        for values in vlist:
            if values.get('password') and 'login' in values:
                tocheck.append(values['login'])
        if tocheck:
            cls._check_passwd_ldap_user(tocheck)
        return super(User, cls).create(vlist)

    @classmethod
    def write(cls, *args):
        actions = iter(args)
        for users, values in zip(actions, actions):
            if values.get('password'):
                logins = [x.login for x in users]
                cls._check_passwd_ldap_user(logins)
        super(User, cls).write(*args)

    @classmethod
    def _login_ldap(cls, login, parameters):
        if 'password' not in parameters:
            msg = gettext('res.msg_user_password', login=login)
            raise LoginException('password', msg, type='password')
        password = parameters['password']
        try:
            server = ldap_server()
            if server:
                uid = config.get(section, 'uid', default='uid')
                users = cls.ldap_search_user(login, server, attrs=[uid])
                if users and len(users) == 1:
                    [(dn, attrs)] = users
                    with ldap3.Connection(
                            server, dn, password,
                            auto_bind=ldap3.AUTO_BIND_NONE) as con:
                        if server.ssl is False and server.tls is not None:
                            con.start_tls()
                        if (password and con.bind()):
                            # Use ldap uid so we always get the right case
                            login = attrs.get(uid, [login])[0]
                            user_id = cls._get_login(login)[0]
                            if user_id:
                                return user_id
                            elif config.getboolean(section, 'create_user'):
                                user, = cls.create([{
                                            'name': login,
                                            'login': login,
                                            }])
                                return user.id
        except LDAPException:
            logger.error('LDAPError when login', exc_info=True)