File: anonymization.py

package info (click to toggle)
oca-core 11.0.20180730-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 509,684 kB
  • sloc: xml: 258,806; python: 164,081; sql: 217; sh: 92; makefile: 16
file content (148 lines) | stat: -rw-r--r-- 6,492 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.

from itertools import groupby
from operator import itemgetter

from odoo import api, fields, models, _
from odoo.exceptions import UserError
from odoo.tools import pycompat

FIELD_STATES = [('clear', 'Clear'), ('anonymized', 'Anonymized'), ('not_existing', 'Not Existing'), ('new', 'New')]
ANONYMIZATION_HISTORY_STATE = [('started', 'Started'), ('done', 'Done'), ('in_exception', 'Exception occured')]
ANONYMIZATION_DIRECTION = [('clear -> anonymized', 'clear -> anonymized'), ('anonymized -> clear', 'anonymized -> clear')]


def group(lst, cols):
    if isinstance(cols, pycompat.string_types):
        cols = [cols]
    return dict((k, [v for v in itr]) for k, itr in groupby(sorted(lst, key=itemgetter(*cols)), itemgetter(*cols)))


class IrModelFieldsAnonymization(models.Model):
    _name = 'ir.model.fields.anonymization'
    _rec_name = 'field_id'

    model_name = fields.Char('Object Name', required=True)
    model_id = fields.Many2one('ir.model', string='Object', ondelete='set null')
    field_name = fields.Char(required=True)
    field_id = fields.Many2one('ir.model.fields', string='Field', ondelete='set null')
    state = fields.Selection(selection=FIELD_STATES, string='Status', required=True, readonly=True, default='clear')

    _sql_constraints = [
        ('model_id_field_id_uniq', 'unique (model_name, field_name)', "You cannot have two fields with the same name on the same object!"),
    ]

    @api.model
    def _get_global_state(self):
        field_ids = self.search([('state', '!=', 'not_existing')])
        if not field_ids or len(field_ids) == len(field_ids.filtered(lambda field: field.state == "clear")):
            state = 'clear'  # all fields are clear
        elif len(field_ids) == len(field_ids.filtered(lambda field: field.state == "anonymized")):
            state = 'anonymized'  # all fields are anonymized
        else:
            state = 'unstable'  # fields are mixed: this should be fixed
        return state

    @api.model
    def _check_write(self):
        """check that the field is created from the menu and not from an database update
           otherwise the database update can crash:"""
        if self.env.context.get('manual'):
            global_state = self._get_global_state()
            if global_state == 'anonymized':
                raise UserError(_("The database is currently anonymized, you cannot create, modify or delete fields."))
            elif global_state == 'unstable':
                raise UserError(_("The database anonymization is currently in an unstable state. Some fields are anonymized,"
                                " while some fields are not anonymized. You should try to solve this problem before trying to create, write or delete fields."))
        return True

    @api.model
    def _get_model_and_field_ids(self, vals):
        if vals.get('field_name') and vals.get('model_name'):
            field = self.env['ir.model.fields']._get(vals['model_name'], vals['field_name'])
            return (field.model_id.id, field.id)
        return (False, False)

    @api.model
    def create(self, vals):
        # check field state: all should be clear before we can add a new field to anonymize:
        self._check_write()
        if vals.get('field_name') and vals.get('model_name'):
            vals['model_id'], vals['field_id'] = self._get_model_and_field_ids(vals)
        # check not existing fields:
        vals['state'] = self._get_global_state() if vals.get('field_id') else 'not_existing'
        return super(IrModelFieldsAnonymization, self).create(vals)

    @api.multi
    def write(self, vals):
        # check field state: all should be clear before we can modify a field:
        if not len(vals) == 1 and vals.get('state') == 'clear':
            self._check_write()
        if vals.get('field_name') and vals.get('model_name'):
            vals['model_id'], vals['field_id'] = self._get_model_and_field_ids(vals)
        # check not existing fields:
        if 'field_id' in vals:
            if not vals['field_id']:
                vals['state'] = 'not_existing'
            else:
                global_state = self._get_global_state()
                if global_state != 'unstable':
                    vals['state'] = global_state
        return super(IrModelFieldsAnonymization, self).write(vals)

    @api.multi
    def unlink(self):
        # check field state: all should be clear before we can unlink a field:
        self._check_write()
        return super(IrModelFieldsAnonymization, self).unlink()

    @api.onchange('model_id')
    def _onchange_model_id(self):
        self.field_name = False
        self.field_id = False
        self.model_name = self.model_id.model

    @api.onchange('model_name')
    def _onchange_model_name(self):
        self.field_name = False
        self.field_id = False
        self.model_id = self.env['ir.model']._get(self.model_name)

    @api.onchange('field_name')
    def _onchange_field_name(self):
        if self.field_name and self.model_name:
            self.field_id = self.env['ir.model.fields']._get(self.model_name, self.field_name)
        else:
            self.field_id = False

    @api.onchange('field_id')
    def _onchange_field_id(self):
        self.field_name = self.field_id.name


class IrModelFieldsAnonymizationHistory(models.Model):
    _name = 'ir.model.fields.anonymization.history'
    _order = "date desc"

    date = fields.Datetime(required=True, readonly=True)
    field_ids = fields.Many2many(
        'ir.model.fields.anonymization', 'anonymized_field_to_history_rel',
        'field_id', 'history_id', string='Fields', readonly=True
    )
    state = fields.Selection(selection=ANONYMIZATION_HISTORY_STATE, string='Status', required=True, readonly=True)
    direction = fields.Selection(selection=ANONYMIZATION_DIRECTION, required=True, readonly=True)
    msg = fields.Text('Message', readonly=True)
    filepath = fields.Char('File path', readonly=True)


class IrModelFieldsAnonymizationMigrationFix(models.Model):
    _name = 'ir.model.fields.anonymization.migration.fix'
    _order = "sequence"

    target_version = fields.Char('Target Version')
    model_name = fields.Char('Model')
    field_name = fields.Char('Field')
    query = fields.Text()
    query_type = fields.Selection(selection=[('sql', 'sql'), ('python', 'python')], string='Query')
    sequence = fields.Integer()