File: res.py

package info (click to toggle)
tryton-modules-ldap-authentication 3.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 272 kB
  • ctags: 57
  • sloc: python: 330; makefile: 6
file content (175 lines) | stat: -rw-r--r-- 6,279 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#This file is part of Tryton.  The COPYRIGHT file at the top level of
#this repository contains the full copyright notices and license terms.
import logging
import urlparse

import ldap
from trytond.transaction import Transaction
from trytond.pool import Pool, PoolMeta
from trytond.config import config, parse_uri

__all__ = ['User']
__metaclass__ = PoolMeta

logger = logging.getLogger(__name__)
section = 'ldap_authentication'


def parse_ldap_url(uri):
    unquote = urlparse.unquote
    uri = parse_uri(uri)
    dn = unquote(uri.path)[1:]
    attributes, scope, filter_, extensions = (
        uri.query.split('?') + [''] * 4)[:4]
    if not scope:
        scope = 'base'
    extensions = urlparse.parse_qs(extensions)
    return (uri, dn, unquote(attributes), unquote(scope), unquote(filter_),
        extensions)


def ldap_connection():
    uri = config.get(section, 'uri')
    if not uri:
        return
    uri, _, _, _, _, extensions = parse_ldap_url(uri)
    if uri.scheme.startswith('ldaps'):
        scheme, port = 'ldaps', 636
    else:
        scheme, port = 'ldap', 389
    conn = ldap.initialize('%s://%s:%s/' % (
            scheme, uri.hostname, uri.port or port))
    if config.getboolean(section, 'active_directory', 'False'):
        conn.set_option(ldap.OPT_REFERRALS, 0)
    if 'tls' in uri.scheme:
        conn.start_tls_s()

    bindname = extensions.get('bindname')
    if not bindname:
        bindname = extensions.get('!bindname')
    if bindname:
        # XXX find better way to get the password
        conn.simple_bind_s(bindname, config.get(section, 'bind_pass'))
    return conn


class User:
    __name__ = 'res.user'

    @classmethod
    def __setup__(cls):
        super(User, cls).__setup__()
        cls._error_messages.update({
                'set_passwd_ldap_user': (
                    'You can not set the password of ldap user "%s".'),
                })

    @staticmethod
    def ldap_search_user(login, con, attrs=None):
        '''
        Return the result of a ldap search for the login using the ldap
        connection con based on connection.
        The attributes values defined in attrs will be return.
        '''
        _, dn, _, scope, filter_, _ = parse_ldap_url(
            config.get(section, 'uri'))
        scope = {
            'base': ldap.SCOPE_BASE,
            'onelevel': ldap.SCOPE_ONELEVEL,
            'subtree': ldap.SCOPE_SUBTREE,
            }.get(scope)
        uid = config.get(section, 'uid', 'uid')
        if filter_:
            filter_ = '(&(%s=%s)%s)' % (uid, login, filter_)
        else:
            filter_ = '(%s=%s)' % (uid, login)

        result = con.search_s(dn, scope, filter_, attrs)
        if config.get(section, 'active_directory'):
            result = [x for x in result if x[0]]
        if result and len(result) > 1:
            logger.info('ldap_search_user found more than 1 user')
        return result

    @classmethod
    def _check_passwd_ldap_user(cls, logins):
        find = False
        try:
            con = ldap_connection()
            if not con:
                return
            for login in logins:
                if cls.ldap_search_user(login, con, attrs=[]):
                    find = True
                    break
        except ldap.LDAPError, e:
            logger.error('LDAPError: %s' % str(e))
        if find:
            cls.raise_user_error('set_passwd_ldap_user', (login.rec_name,))

    @classmethod
    def create(cls, vlist):
        tocheck = []
        for values in vlist:
            if values.get('password') and 'login' in values:
                tocheck.append(values['login'])
        if tocheck:
            with Transaction().set_context(_check_access=False):
                cls._check_passwd_ldap_user(tocheck)
        return super(User, cls).create(vlist)

    @classmethod
    def write(cls, *args):
        actions = iter(args)
        for users, values in zip(actions, actions):
            if values.get('password'):
                logins = [x.login for x in users]
                cls._check_passwd_ldap_user(logins)
        super(User, cls).write(*args)

    @classmethod
    def set_preferences(cls, values, old_password=False):
        if 'password' in values:
            try:
                con = ldap_connection()
                if con:
                    user = cls(Transaction().user)
                    uid = config.get(section, 'uid', 'uid')
                    users = cls.ldap_search_user(user.login, con, attrs=[uid])
                    if users and len(users) == 1:
                        [(dn, attrs)] = users
                        if con.simple_bind_s(dn, old_password):
                            con.passwd_s(dn, old_password, values['password'])
                            values = values.copy()
                            del values['password']
                        else:
                            cls.raise_user_error('wrong_password')
            except ldap.LDAPError, e:
                logger.error('LDAPError: %s' % str(e))
        super(User, cls).set_preferences(values, old_password=old_password)

    @classmethod
    def get_login(cls, login, password):
        pool = Pool()
        LoginAttempt = pool.get('res.user.login.attempt')
        try:
            con = ldap_connection()
            if con:
                uid = config.get(section, 'uid', 'uid')
                users = cls.ldap_search_user(login, con, attrs=[uid])
                if users and len(users) == 1:
                    [(dn, attrs)] = users
                    if password and con.simple_bind_s(dn, password):
                        user_id, _ = cls._get_login(login)
                        if user_id:
                            LoginAttempt.remove(login)
                            return user_id
                        elif config.getboolean(section, 'create_user'):
                            user, = cls.create([{
                                        'name': attrs.get(uid, [login])[0],
                                        'login': login,
                                        }])
                            return user.id
        except ldap.LDAPError, e:
            logger.error('LDAPError: %s' % str(e))
        return super(User, cls).get_login(login, password)