1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
import functools
import logging
import os
import re
from odoo import _, api, fields, models
from odoo.addons.base.models.res_users import check_identity
from odoo.exceptions import AccessDenied, UserError
from odoo.http import request
from odoo.tools import sql
from odoo.addons.auth_totp.models.totp import TOTP, TOTP_SECRET_SIZE
_logger = logging.getLogger(__name__)
compress = functools.partial(re.sub, r'\s', '')
class Users(models.Model):
_inherit = 'res.users'
totp_secret = fields.Char(copy=False, groups=fields.NO_ACCESS, compute='_compute_totp_secret', inverse='_inverse_token')
totp_enabled = fields.Boolean(string="Two-factor authentication", compute='_compute_totp_enabled', search='_totp_enable_search')
totp_trusted_device_ids = fields.One2many('auth_totp.device', 'user_id', string="Trusted Devices")
def init(self):
super().init()
if not sql.column_exists(self.env.cr, self._table, "totp_secret"):
self.env.cr.execute("ALTER TABLE res_users ADD COLUMN totp_secret varchar")
@property
def SELF_READABLE_FIELDS(self):
return super().SELF_READABLE_FIELDS + ['totp_enabled', 'totp_trusted_device_ids']
def _mfa_type(self):
r = super()._mfa_type()
if r is not None:
return r
if self.totp_enabled:
return 'totp'
def _should_alert_new_device(self):
""" Determine if an alert should be sent to the user regarding a new device
- 2FA enabled -> only for new device
- Not enabled -> no alert
To be overriden if needs to be disabled for other 2FA providers
"""
if request and self._mfa_type():
key = request.cookies.get('td_id')
if key:
if request.env['auth_totp.device']._check_credentials_for_uid(
scope="browser", key=key, uid=self.id):
# the device is known
return False
# 2FA enabled but not a trusted device
return True
return super()._should_alert_new_device()
def _mfa_url(self):
r = super()._mfa_url()
if r is not None:
return r
if self._mfa_type() == 'totp':
return '/web/login/totp'
@api.depends('totp_secret')
def _compute_totp_enabled(self):
for r, v in zip(self, self.sudo()):
r.totp_enabled = bool(v.totp_secret)
def _rpc_api_keys_only(self):
# 2FA enabled means we can't allow password-based RPC
self.ensure_one()
return self.totp_enabled or super()._rpc_api_keys_only()
def _get_session_token_fields(self):
return super()._get_session_token_fields() | {'totp_secret'}
def _totp_check(self, code):
sudo = self.sudo()
key = base64.b32decode(sudo.totp_secret)
match = TOTP(key).match(code)
if match is None:
_logger.info("2FA check: FAIL for %s %r", self, sudo.login)
raise AccessDenied(_("Verification failed, please double-check the 6-digit code"))
_logger.info("2FA check: SUCCESS for %s %r", self, sudo.login)
def _totp_try_setting(self, secret, code):
if self.totp_enabled or self != self.env.user:
_logger.info("2FA enable: REJECT for %s %r", self, self.login)
return False
secret = compress(secret).upper()
match = TOTP(base64.b32decode(secret)).match(code)
if match is None:
_logger.info("2FA enable: REJECT CODE for %s %r", self, self.login)
return False
self.sudo().totp_secret = secret
if request:
self.env.flush_all()
# update session token so the user does not get logged out (cache cleared by change)
new_token = self.env.user._compute_session_token(request.session.sid)
request.session.session_token = new_token
_logger.info("2FA enable: SUCCESS for %s %r", self, self.login)
return True
@check_identity
def action_totp_disable(self):
logins = ', '.join(map(repr, self.mapped('login')))
if not (self == self.env.user or self.env.user._is_admin() or self.env.su):
_logger.info("2FA disable: REJECT for %s (%s) by uid #%s", self, logins, self.env.user.id)
return False
self.revoke_all_devices()
self.sudo().write({'totp_secret': False})
if request and self == self.env.user:
self.env.flush_all()
# update session token so the user does not get logged out (cache cleared by change)
new_token = self.env.user._compute_session_token(request.session.sid)
request.session.session_token = new_token
_logger.info("2FA disable: SUCCESS for %s (%s) by uid #%s", self, logins, self.env.user.id)
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'warning',
'message': _("Two-factor authentication disabled for the following user(s): %s", ', '.join(self.mapped('name'))),
'next': {'type': 'ir.actions.act_window_close'},
}
}
@check_identity
def action_totp_enable_wizard(self):
if self.env.user != self:
raise UserError(_("Two-factor authentication can only be enabled for yourself"))
if self.totp_enabled:
raise UserError(_("Two-factor authentication already enabled"))
secret_bytes_count = TOTP_SECRET_SIZE // 8
secret = base64.b32encode(os.urandom(secret_bytes_count)).decode()
# format secret in groups of 4 characters for readability
secret = ' '.join(map(''.join, zip(*[iter(secret)]*4)))
w = self.env['auth_totp.wizard'].create({
'user_id': self.id,
'secret': secret,
})
return {
'type': 'ir.actions.act_window',
'target': 'new',
'res_model': 'auth_totp.wizard',
'name': _("Two-Factor Authentication Activation"),
'res_id': w.id,
'views': [(False, 'form')],
'context': self.env.context,
}
@check_identity
def revoke_all_devices(self):
self._revoke_all_devices()
def _revoke_all_devices(self):
self.totp_trusted_device_ids._remove()
@api.model
def change_password(self, old_passwd, new_passwd):
self.env.user._revoke_all_devices()
return super().change_password(old_passwd, new_passwd)
def _compute_totp_secret(self):
for user in self:
self.env.cr.execute('SELECT totp_secret FROM res_users WHERE id=%s', (user.id,))
user.totp_secret = self.env.cr.fetchone()[0]
def _inverse_token(self):
for user in self:
secret = user.totp_secret if user.totp_secret else None
self.env.cr.execute('UPDATE res_users SET totp_secret = %s WHERE id=%s', (secret, user.id))
def _totp_enable_search(self, operator, value):
value = not value if operator == '!=' else value
if value:
self.env.cr.execute("SELECT id FROM res_users WHERE totp_secret IS NOT NULL")
else:
self.env.cr.execute("SELECT id FROM res_users WHERE totp_secret IS NULL OR totp_secret='false'")
result = self.env.cr.fetchall()
return [('id', 'in', [x[0] for x in result])]
|