File: auth_passkey_key.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (189 lines) | stat: -rw-r--r-- 8,038 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import base64
import json
import logging
from werkzeug.urls import url_parse

from odoo import api, Command, fields, models, _
from odoo.exceptions import AccessDenied
from odoo.http import request
from odoo.tools import sql, SQL

from odoo.addons.base.models.res_users import check_identity

from .._vendor.webauthn import base64url_to_bytes, generate_authentication_options, generate_registration_options, options_to_json, verify_authentication_response, verify_registration_response
from .._vendor.webauthn.helpers import bytes_to_base64url
from .._vendor.webauthn.helpers.structs import AuthenticatorSelectionCriteria, ResidentKeyRequirement, UserVerificationRequirement

_logger = logging.getLogger(__name__)


class PassKey(models.Model):
    _name = 'auth.passkey.key'
    _description = 'Passkey'
    _order = 'id desc'

    name = fields.Char(required=True)
    credential_identifier = fields.Char(required=True, groups='base.group_system')
    public_key = fields.Char(required=True, groups='base.group_system', compute='_compute_public_key', inverse='_inverse_public_key')
    sign_count = fields.Integer(default=0, groups='base.group_system')

    _sql_constraints = [
        ('unique_identifier', 'UNIQUE(credential_identifier)', 'The credential identifier should be unique.'),
    ]

    def init(self):
        super().init()
        if not sql.column_exists(self.env.cr, 'auth_passkey_key', 'public_key'):
            self.env.cr.execute(SQL('ALTER TABLE auth_passkey_key ADD COLUMN public_key varchar'))

    def unlink(self):
        for passkey in self:
            _logger.info(
                "Passkey (#%d) deleted by %s (#%d) from %s",
                passkey.id,
                self.env.user.login, self.env.user.id,
                request.httprequest.environ['REMOTE_ADDR'] if request else 'n/a'
            )
        return super().unlink()

    def _compute_public_key(self):
        query = 'SELECT public_key FROM auth_passkey_key WHERE id = %s'
        for passkey in self:
            self.env.cr.execute(SQL(query, passkey.id))
            public_key = self.env.cr.fetchone()[0]
            passkey.public_key = public_key

    def _inverse_public_key(self):
        pass

    @api.model
    def _get_session_challenge(self):
        challenge = request.session.pop('webauthn_challenge', None)
        if not challenge:
            raise AccessDenied('Cannot find a challenge for this session')
        return challenge

    @api.model
    def _start_auth(self):
        assert request
        authentication_options = json.loads(options_to_json(generate_authentication_options(
            rp_id=url_parse(self.get_base_url()).host,
            user_verification=UserVerificationRequirement.REQUIRED,
        )))
        request.session['webauthn_challenge'] = authentication_options['challenge']
        return authentication_options

    @api.model
    def _verify_auth(self, auth, public_key, sign_count):
        parsed_url = url_parse(self.get_base_url())
        auth_verification = verify_authentication_response(
            credential=auth,
            expected_challenge=base64url_to_bytes(self._get_session_challenge()),
            expected_origin=parsed_url.replace(path='').to_url(),
            expected_rp_id=parsed_url.host,
            credential_public_key=base64url_to_bytes(public_key),
            credential_current_sign_count=sign_count,
            require_user_verification=True,
        )
        return auth_verification.new_sign_count

    @api.model
    def _start_registration(self):
        assert request
        registration_options = json.loads(options_to_json(generate_registration_options(
            rp_id=url_parse(self.get_base_url()).host,
            rp_name='Odoo',
            user_id=str(self.env.user.id).encode(),
            user_name=self.env.user.login,
            authenticator_selection=AuthenticatorSelectionCriteria(
                resident_key=ResidentKeyRequirement.REQUIRED,
                user_verification=UserVerificationRequirement.REQUIRED
            )
        )))
        request.session['webauthn_challenge'] = registration_options['challenge']
        return registration_options

    @api.model
    def _verify_registration_options(self, registration):
        parsed_url = url_parse(self.get_base_url())
        verification = verify_registration_response(
            credential=registration,
            expected_challenge=base64url_to_bytes(self._get_session_challenge()),
            expected_origin=parsed_url.replace(path='').to_url(),
            expected_rp_id=parsed_url.host,
            require_user_verification=True,
        )
        return {
            'credential_id': verification.credential_id,
            'credential_public_key': verification.credential_public_key,
        }

    @check_identity
    def action_delete_passkey(self):
        for key in self:
            if key.create_uid.id == self.env.user.id:
                # Force to go through `res.users.auth_passkey_key_ids` to trigger the session token cache invalidation
                # See `res.users.write` and `_get_invalidation_fields`
                # `self.env.user` is already sudo, so no need to re-apply `sudo` to get delete access right.
                self.env.user.write({'auth_passkey_key_ids': [Command.delete(key.id)]})
                new_token = self.env.user._compute_session_token(request.session.sid)
                request.session.session_token = new_token
            else:
                _logger.info(
                    "%s (#%d) attempted to delete passkey (#%d) belonging to %s (#%d) from %s but was denied.",
                    self.env.user.login, self.env.user.id,
                    key.id,
                    key.create_uid.login, key.create_uid.id,
                    request.httprequest.environ['REMOTE_ADDR'] if request else 'n/a'
                )

    def action_rename_passkey(self):
        return {
            'name': _('Rename Passkey'),
            'type': 'ir.actions.act_window',
            'res_model': 'auth.passkey.key',
            'view_id': self.env.ref('auth_passkey.auth_passkey_key_rename').id,
            'view_mode': 'form',
            'target': 'new',
            'res_id': self.id,
            'context': {
                'dialog_size': 'medium',
            }
        }


class PassKeyCreate(models.TransientModel):
    _name = 'auth.passkey.key.create'
    _description = 'Create a Passkey'

    name = fields.Char('Name', required=True)

    @check_identity
    def make_key(self, registration=None):
        # We add in these fields with JS, if we didn't give them default values we would get a XML validation warning.
        assert registration, "registration can not be empty"
        self.ensure_one()
        verification = request.env['auth.passkey.key']._verify_registration_options(registration)
        # Force to go through `res.users.auth_passkey_key_ids` to trigger the session token cache invalidation
        # See `res.users.write` and `_get_invalidation_fields`
        # `self.env.user` is already sudo, so no need to re-apply `sudo` to get create access right.
        self.env.user.write({'auth_passkey_key_ids': [Command.create({
            'name': self.name,
            'credential_identifier': bytes_to_base64url(verification['credential_id']),
        })]})
        passkey = self.env.user.auth_passkey_key_ids[0]
        self.env.cr.execute(SQL(
            "UPDATE auth_passkey_key SET public_key = %s WHERE id = %s",
            base64.urlsafe_b64encode(verification['credential_public_key']).decode(),
            passkey.id,
        ))
        ip = request.httprequest.environ['REMOTE_ADDR'] if request else 'n/a'
        _logger.info(
            "Passkey (#%d) created by %s (#%d) from %s",
            passkey.id,
            self.env.user.login, self.env.user.id,
            ip
        )
        new_token = self.env.user._compute_session_token(request.session.sid)
        request.session.session_token = new_token
        return True