File: security.py

package info (click to toggle)
tryton-server 1.6.1-2%2Bsqueeze1
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 2,436 kB
  • ctags: 3,017
  • sloc: python: 24,379; xml: 3,771; sql: 506; sh: 126; makefile: 74
file content (88 lines) | stat: -rw-r--r-- 2,856 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#This file is part of Tryton.  The COPYRIGHT file at the top level of
#this repository contains the full copyright notices and license terms.
from trytond.backend import Database
from trytond.session import Session
from trytond.pool import Pool
from trytond.config import CONFIG
import time


_USER_CACHE = {}
_USER_TRY = {}

def login(dbname, loginname, password, cache=True):
    _USER_TRY.setdefault(dbname, {})
    _USER_TRY[dbname].setdefault(loginname, 0)
    database = Database(dbname).connect()
    cursor = database.cursor()
    database_list = Pool.database_list()
    pool = Pool(dbname)
    if not dbname in database_list:
        pool.init()
    user_obj = pool.get('res.user')
    password = password.decode('utf-8')
    user_id = user_obj.get_login(cursor, 0, loginname, password)
    cursor.commit()
    cursor.close()
    if user_id:
        _USER_TRY[dbname][loginname] = 0
        if cache:
            _USER_CACHE.setdefault(dbname, {})
            _USER_CACHE[dbname].setdefault(user_id, [])
            session = Session(user_id)
            session.name = loginname
            _USER_CACHE[dbname][user_id].append(session)
            return (user_id, session.session)
        else:
            return user_id
    time.sleep(2 ** _USER_TRY[dbname][loginname])
    _USER_TRY[dbname][loginname] += 1
    return False

def logout(dbname, user, session):
    name = ''
    if _USER_CACHE.get(dbname, {}).has_key(user):
        for i, real_session \
                in enumerate(_USER_CACHE[dbname][user]):
            if real_session.session == session:
                name = real_session.name
                del _USER_CACHE[dbname][user][i]
                break
    return name

def check_super(passwd):
    if passwd == CONFIG['admin_passwd']:
        return True
    else:
        raise Exception('AccessDenied')

def check(dbname, user, session):
    if user == 0:
        raise Exception('AccessDenied')
    result = None
    now = time.time()
    timeout = int(CONFIG['session_timeout'])
    if _USER_CACHE.get(dbname, {}).has_key(user):
        to_del = []
        for i, real_session \
                in enumerate(_USER_CACHE[dbname][user]):
            if abs(real_session.timestamp - now) < timeout:
                if real_session.session == session:
                    result = real_session
            else:
                to_del.insert(0, i)
        for i in to_del:
            del _USER_CACHE[dbname][user][i]
    if result:
        return result
    raise Exception('NotLogged')

def get_connections(dbname, user):
    res = 0
    now = time.time()
    timeout = int(CONFIG['session_timeout'])
    if _USER_CACHE.get(dbname, {}).has_key(int(user)):
        for _, session in enumerate(_USER_CACHE[dbname][int(user)]):
            if abs(session.timestamp - now) < timeout:
                res += 1
    return res