File: privacy_lookup_wizard.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (323 lines) | stat: -rw-r--r-- 12,370 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.

from collections import defaultdict

from odoo import api, fields, models, tools, _
from odoo.exceptions import UserError
from odoo.tools import SQL


class PrivacyLookupWizard(models.TransientModel):
    _name = 'privacy.lookup.wizard'
    _description = 'Privacy Lookup Wizard'
    _transient_max_count = 0
    _transient_max_hours = 24

    name = fields.Char(required=True)
    email = fields.Char(required=True)
    line_ids = fields.One2many('privacy.lookup.wizard.line', 'wizard_id')
    execution_details = fields.Text(compute='_compute_execution_details', store=True)
    log_id = fields.Many2one('privacy.log')
    records_description = fields.Text(compute='_compute_records_description')
    line_count = fields.Integer(compute='_compute_line_count')

    @api.depends('line_ids')
    def _compute_line_count(self):
        for wizard in self:
            wizard.line_count = len(wizard.line_ids)

    def _compute_display_name(self):
        self.display_name = _('Privacy Lookup')

    def _get_query_models_blacklist(self):
        return [
            # Already Managed
            'res.partner',
            'res.users',
            # Ondelete Cascade
            'mail.notification',
            'mail.followers',
            'discuss.channel.member',
            # Special case for direct messages
            'mail.message',
        ]

    def _get_query(self):
        name = self.name.strip()
        email = f"%{self.email.strip()}%"
        email_normalized = tools.email_normalize(self.email.strip())

        # Step 1: Retrieve users/partners liked to email address or name
        query = SQL("""
            WITH indirect_references AS (
                SELECT id
                FROM res_partner
                WHERE email_normalized = %s
                OR name ilike %s)
            SELECT
                %s AS res_model_id,
                id AS res_id,
                active AS is_active
            FROM res_partner
            WHERE id IN (SELECT id FROM indirect_references)
            UNION ALL
            SELECT
                %s AS res_model_id,
                id AS res_id,
                active AS is_active
            FROM res_users
            WHERE (
                (login ilike %s)
                OR
                (partner_id IN (
                    SELECT id
                    FROM res_partner
                    WHERE email ilike %s or name ilike %s)))
            -- Step 2: Special case for direct messages
            UNION ALL
            SELECT
                %s AS res_model_id,
                id AS res_id,
                True AS is_active
            FROM mail_message
            WHERE author_id IN (SELECT id FROM indirect_references)
        """,
            # Indirect references CTE
            email_normalized, name,
            # Search on res.partner
            self.env['ir.model.data']._xmlid_to_res_id('base.model_res_partner'),
            # Search on res.users
            self.env['ir.model.data']._xmlid_to_res_id('base.model_res_users'), email, email, name,
            # Direct messages
            self.env['ir.model.data']._xmlid_to_res_id('mail.model_mail_message'),
        )

        # Step 3: Retrieve info on other models
        blacklisted_models = self._get_query_models_blacklist()
        for model_name in self.env:
            if model_name in blacklisted_models:
                continue

            model = self.env[model_name]
            if model._transient or not model._auto:
                continue

            table_name = model._table

            conditions = []
            # 3.1 Search Basic Personal Data Records (aka email/name usage)
            for field_name in ['email_normalized', 'email', 'email_from', 'company_email']:
                if field_name in model and model._fields[field_name].store:
                    rec_name = model._rec_name or 'name'
                    is_normalized = field_name == 'email_normalized' or (model_name == 'mailing.trace' and field_name == 'email')

                    conditions.append(SQL(
                        "%s %s %s",
                        SQL.identifier(field_name),
                        SQL('=') if is_normalized else SQL('ilike'),  # Manage Foo Bar <foo@bar.com>
                        email_normalized if is_normalized else email
                    ))
                    if rec_name in model and model._fields[model._rec_name].store and model._fields[model._rec_name].type == 'char' and not model._fields[model._rec_name].translate:
                        conditions.append(SQL(
                            "%s ilike %s",
                            SQL.identifier(rec_name),
                            name,
                        ))
                    if is_normalized:
                        break

            # 3.2 Search Indirect Personal Data References (aka partner_id)
            conditions.extend(
                SQL(
                    "%s in (SELECT id FROM indirect_references)",
                    model._field_to_sql(table_name, field_name),
                )
                for field_name, field in model._fields.items()
                if field.comodel_name == 'res.partner'
                if field.store
                if field.type == 'many2one'
                if field.ondelete != 'cascade'
            )

            if conditions:
                query = SQL("""
                    %s
                    UNION ALL
                    SELECT
                        %s AS res_model_id,
                        id AS res_id,
                        %s AS is_active
                    FROM %s
                    WHERE %s
                """,
                    query,
                    self.env['ir.model'].search([('model', '=', model_name)]).id,
                    SQL.identifier('active') if 'active' in model else True,
                    SQL.identifier(table_name),
                    SQL(" OR ").join(conditions),
                )
        return query

    def action_lookup(self):
        self.ensure_one()
        query = self._get_query()
        self.env.flush_all()
        self.env.cr.execute(query)
        results = self.env.cr.dictfetchall()
        self.line_ids = [(5, 0, 0)] + [(0, 0, reference) for reference in results]
        return self.action_open_lines()

    def _post_log(self):
        self.ensure_one()
        if not self.log_id and self.execution_details:
            self.log_id = self.env['privacy.log'].create({
                'anonymized_name': self.name,
                'anonymized_email': self.email,
                'execution_details': self.execution_details,
                'records_description': self.records_description,
            })
        else:
            self.log_id.execution_details = self.execution_details
            self.log_id.records_description = self.records_description

    @api.depends('line_ids.execution_details')
    def _compute_execution_details(self):
        for wizard in self:
            wizard.execution_details = '\n'.join(line.execution_details for line in wizard.line_ids if line.execution_details)
            wizard._post_log()

    @api.depends('line_ids')
    def _compute_records_description(self):
        for wizard in self:
            if not wizard.line_ids:
                wizard.records_description = ''
                continue
            records_by_model = defaultdict(list)
            for line in wizard.line_ids:
                records_by_model[line.res_model_id].append(line.res_id)
            wizard.records_description = '\n'.join('{model_name} ({count}): {ids_str}'.format(
                model_name=(f'{model.name} - {model.model}' if self.env.user.has_group('base.group_no_one') else model.name),
                count=len(ids),
                ids_str=', '.join('#%s' % (rec_id) for rec_id in ids),
            ) for model, ids in records_by_model.items())

    def action_open_lines(self):
        self.ensure_one()
        action = self.env['ir.actions.act_window']._for_xml_id('privacy_lookup.action_privacy_lookup_wizard_line')
        action['domain'] = [('wizard_id', '=', self.id)]
        return action


class PrivacyLookupWizardLine(models.TransientModel):
    _name = 'privacy.lookup.wizard.line'
    _description = 'Privacy Lookup Wizard Line'
    _transient_max_count = 0
    _transient_max_hours = 24

    @api.model
    def _selection_target_model(self):
        return [(model.model, model.name) for model in self.env['ir.model'].sudo().search([])]

    wizard_id = fields.Many2one('privacy.lookup.wizard')
    res_id = fields.Integer(
        string="Resource ID",
        required=True)
    res_name = fields.Char(
        string='Resource name',
        compute='_compute_res_name',
        store=True)
    res_model_id = fields.Many2one(
        'ir.model',
        'Related Document Model',
        ondelete='cascade')
    res_model = fields.Char(
        string='Document Model',
        related='res_model_id.model',
        store=True,
        readonly=True)
    resource_ref = fields.Reference(
        string='Record',
        selection='_selection_target_model',
        compute='_compute_resource_ref',
        inverse='_set_resource_ref')
    has_active = fields.Boolean(compute='_compute_has_active', store=True)
    is_active = fields.Boolean()
    is_unlinked = fields.Boolean()
    execution_details = fields.Char(default='')

    @api.depends('res_model', 'res_id', 'is_unlinked')
    def _compute_resource_ref(self):
        for line in self:
            if line.res_model and line.res_model in self.env and not line.is_unlinked:
                # Exclude records that can't be read (eg: multi-company ir.rule)
                try:
                    self.env[line.res_model].browse(line.res_id).check_access('read')
                    line.resource_ref = '%s,%s' % (line.res_model, line.res_id or 0)
                except Exception:
                    line.resource_ref = None
            else:
                line.resource_ref = None

    def _set_resource_ref(self):
        for line in self:
            if line.resource_ref:
                line.res_id = line.resource_ref.id

    @api.depends('res_model_id')
    def _compute_has_active(self):
        for line in self:
            if not line.res_model_id:
                line.has_active = False
                continue
            line.has_active = 'active' in self.env[line.res_model]

    @api.depends('res_model', 'res_id')
    def _compute_res_name(self):
        for line in self:
            if not line.res_id or not line.res_model:
                continue
            record = self.env[line.res_model].sudo().browse(line.res_id)
            if not record.exists():
                continue
            name = record.display_name
            line.res_name = name if name else f'{line.res_model_id.name}/{line.res_id}'

    @api.onchange('is_active')
    def _onchange_is_active(self):
        for line in self:
            if not line.res_model_id or not line.res_id:
                continue
            action = _('Unarchived') if line.is_active else _('Archived')
            line.execution_details = '%s %s #%s' % (action, line.res_model_id.name, line.res_id)
            self.env[line.res_model].sudo().browse(line.res_id).write({'active': line.is_active})

    def action_unlink(self):
        self.ensure_one()
        if self.is_unlinked:
            raise UserError(_('The record is already unlinked.'))
        self.env[self.res_model].sudo().browse(self.res_id).unlink()
        self.execution_details = '%s %s #%s' % (_('Deleted'), self.res_model_id.name, self.res_id)
        self.is_unlinked = True

    def action_archive_all(self):
        for line in self:
            if not line.has_active or not line.is_active:
                continue
            line.is_active = False
            line._onchange_is_active()

    def action_unlink_all(self):
        for line in self:
            if line.is_unlinked:
                continue
            line.action_unlink()

    def action_open_record(self):
        self.ensure_one()
        return {
            'type': 'ir.actions.act_window',
            'view_mode': 'form',
            'res_id': self.res_id,
            'res_model': self.res_model,
        }