File: admin.py

package info (click to toggle)
tryton-server 7.0.40-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,748 kB
  • sloc: python: 53,502; xml: 5,194; sh: 803; sql: 217; makefile: 28
file content (182 lines) | stat: -rw-r--r-- 6,931 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# This file is part of Tryton.  The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import logging
import os
import random
import sys
from getpass import getpass

from sql import Literal, Table

from trytond import backend
from trytond.config import config
from trytond.pool import Pool
from trytond.sendmail import send_test_email
from trytond.transaction import Transaction, TransactionError, inactive_records

__all__ = ['run']
logger = logging.getLogger(__name__)


def run(options):
    main_lang = config.get('database', 'language')
    init = {}

    if options.test_email:
        send_test_email(options.test_email)

    for db_name in options.database_names:
        init[db_name] = False
        database = backend.Database(db_name)
        database.connect()
        if options.update:
            if not database.test():
                logger.info("init db")
                database.init()
                init[db_name] = True
        elif not database.test():
            raise Exception('"%s" is not a Tryton database.' % db_name)

    for db_name in options.database_names:
        if options.update:
            with Transaction().start(db_name, 0) as transaction, \
                    transaction.connection.cursor() as cursor:
                database = backend.Database(db_name)
                database.connect()
                if not database.test():
                    raise Exception('"%s" is not a Tryton database.' % db_name)
                lang = Table('ir_lang')
                cursor.execute(*lang.select(lang.code,
                        where=lang.translatable == Literal(True)))
                lang = set([x[0] for x in cursor])
            lang.add(main_lang)
        else:
            lang = set()
        lang |= set(options.languages)
        pool = Pool(db_name)
        pool.init(update=options.update, lang=list(lang),
            activatedeps=options.activatedeps)

        if options.update_modules_list:
            with Transaction().start(db_name, 0) as transaction:
                Module = pool.get('ir.module')
                Module.update_list()

        if lang:
            with Transaction().start(db_name, 0) as transaction:
                pool = Pool()
                Lang = pool.get('ir.lang')
                languages = Lang.search([
                        ('code', 'in', lang),
                        ])
                Lang.write(languages, {
                        'translatable': True,
                        })

    for db_name in options.database_names:
        if options.email is not None:
            email = options.email
        elif init[db_name]:
            email = input(
                '"admin" email for "%s": ' % db_name)
        else:
            email = None

        password = ''
        if init[db_name] or options.password:
            # try to read password from environment variable
            # TRYTONPASSFILE, empty TRYTONPASSFILE ignored
            passpath = os.getenv('TRYTONPASSFILE')
            if passpath:
                try:
                    with open(passpath) as passfile:
                        password, = passfile.read().splitlines()
                except Exception as err:
                    sys.stderr.write('Can not read password '
                        'from "%s": "%s"\n' % (passpath, err))

            if not password and not options.reset_password:
                while True:
                    password = getpass(
                        '"admin" password for "%s": ' % db_name)
                    password2 = getpass('"admin" password confirmation: ')
                    if password != password2:
                        sys.stderr.write('"admin" password confirmation '
                            'doesn\'t match "admin" password.\n')
                        continue
                    if not password:
                        sys.stderr.write('"admin" password is required.\n')
                        continue
                    break

        transaction_extras = {}
        while True:
            with Transaction().start(
                    db_name, 0, **transaction_extras) as transaction:
                try:
                    pool = Pool()
                    User = pool.get('res.user')
                    Configuration = pool.get('ir.configuration')
                    configuration = Configuration(1)
                    with inactive_records():
                        admin, = User.search([('login', '=', 'admin')])

                    if email is not None:
                        admin.email = email
                    if init[db_name] or options.password:
                        configuration.language = main_lang
                        if not options.reset_password:
                            admin.password = password
                    admin.save()
                    if options.reset_password:
                        User.reset_password([admin])
                    if options.hostname is not None:
                        configuration.hostname = options.hostname or None
                    configuration.save()
                except TransactionError as e:
                    transaction.rollback()
                    e.fix(transaction_extras)
                    continue
                break
        with Transaction().start(db_name, 0, readonly=True):
            if options.validate is not None:
                validate(options.validate, options.validate_percentage)


def validate(models, percentage=100):
    from trytond.model import ModelSingleton, ModelStorage
    from trytond.model.exceptions import ValidationError
    logger = logging.getLogger('validate')
    pool = Pool()
    if not models:
        models = sorted([n for n, _ in pool.iterobject()])
    ratio = min(100, percentage) / 100
    in_max = Transaction().database.IN_MAX
    for name in models:
        logger.info("validate: %s", name)
        Model = pool.get(name)
        if not issubclass(Model, ModelStorage):
            continue
        offset = 0
        limit = in_max
        while True:
            records = Model.search(
                [], order=[('id', 'ASC')], offset=offset, limit=limit)
            if not records:
                break
            records = Model.browse(
                random.sample(records, int(len(records) * ratio)))
            try:
                for record in records:
                    try:
                        Model._validate([record])
                    except ValidationError as exception:
                        logger.error("%s: KO '%s'", record, exception)
                    else:
                        logger.info("%s: OK", record)
            except TransactionError:
                logger.info("%s: SKIPPED", name)
                break
            if issubclass(Model, ModelSingleton):
                break
            offset += limit