1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from itertools import groupby
from operator import itemgetter
from odoo import api, fields, models, _
from odoo.exceptions import UserError
from odoo.tools import pycompat
FIELD_STATES = [('clear', 'Clear'), ('anonymized', 'Anonymized'), ('not_existing', 'Not Existing'), ('new', 'New')]
ANONYMIZATION_HISTORY_STATE = [('started', 'Started'), ('done', 'Done'), ('in_exception', 'Exception occured')]
ANONYMIZATION_DIRECTION = [('clear -> anonymized', 'clear -> anonymized'), ('anonymized -> clear', 'anonymized -> clear')]
def group(lst, cols):
if isinstance(cols, pycompat.string_types):
cols = [cols]
return dict((k, [v for v in itr]) for k, itr in groupby(sorted(lst, key=itemgetter(*cols)), itemgetter(*cols)))
class IrModelFieldsAnonymization(models.Model):
_name = 'ir.model.fields.anonymization'
_rec_name = 'field_id'
model_name = fields.Char('Object Name', required=True)
model_id = fields.Many2one('ir.model', string='Object', ondelete='set null')
field_name = fields.Char(required=True)
field_id = fields.Many2one('ir.model.fields', string='Field', ondelete='set null')
state = fields.Selection(selection=FIELD_STATES, string='Status', required=True, readonly=True, default='clear')
_sql_constraints = [
('model_id_field_id_uniq', 'unique (model_name, field_name)', "You cannot have two fields with the same name on the same object!"),
]
@api.model
def _get_global_state(self):
field_ids = self.search([('state', '!=', 'not_existing')])
if not field_ids or len(field_ids) == len(field_ids.filtered(lambda field: field.state == "clear")):
state = 'clear' # all fields are clear
elif len(field_ids) == len(field_ids.filtered(lambda field: field.state == "anonymized")):
state = 'anonymized' # all fields are anonymized
else:
state = 'unstable' # fields are mixed: this should be fixed
return state
@api.model
def _check_write(self):
"""check that the field is created from the menu and not from an database update
otherwise the database update can crash:"""
if self.env.context.get('manual'):
global_state = self._get_global_state()
if global_state == 'anonymized':
raise UserError(_("The database is currently anonymized, you cannot create, modify or delete fields."))
elif global_state == 'unstable':
raise UserError(_("The database anonymization is currently in an unstable state. Some fields are anonymized,"
" while some fields are not anonymized. You should try to solve this problem before trying to create, write or delete fields."))
return True
@api.model
def _get_model_and_field_ids(self, vals):
if vals.get('field_name') and vals.get('model_name'):
field = self.env['ir.model.fields']._get(vals['model_name'], vals['field_name'])
return (field.model_id.id, field.id)
return (False, False)
@api.model
def create(self, vals):
# check field state: all should be clear before we can add a new field to anonymize:
self._check_write()
if vals.get('field_name') and vals.get('model_name'):
vals['model_id'], vals['field_id'] = self._get_model_and_field_ids(vals)
# check not existing fields:
vals['state'] = self._get_global_state() if vals.get('field_id') else 'not_existing'
return super(IrModelFieldsAnonymization, self).create(vals)
@api.multi
def write(self, vals):
# check field state: all should be clear before we can modify a field:
if not len(vals) == 1 and vals.get('state') == 'clear':
self._check_write()
if vals.get('field_name') and vals.get('model_name'):
vals['model_id'], vals['field_id'] = self._get_model_and_field_ids(vals)
# check not existing fields:
if 'field_id' in vals:
if not vals['field_id']:
vals['state'] = 'not_existing'
else:
global_state = self._get_global_state()
if global_state != 'unstable':
vals['state'] = global_state
return super(IrModelFieldsAnonymization, self).write(vals)
@api.multi
def unlink(self):
# check field state: all should be clear before we can unlink a field:
self._check_write()
return super(IrModelFieldsAnonymization, self).unlink()
@api.onchange('model_id')
def _onchange_model_id(self):
self.field_name = False
self.field_id = False
self.model_name = self.model_id.model
@api.onchange('model_name')
def _onchange_model_name(self):
self.field_name = False
self.field_id = False
self.model_id = self.env['ir.model']._get(self.model_name)
@api.onchange('field_name')
def _onchange_field_name(self):
if self.field_name and self.model_name:
self.field_id = self.env['ir.model.fields']._get(self.model_name, self.field_name)
else:
self.field_id = False
@api.onchange('field_id')
def _onchange_field_id(self):
self.field_name = self.field_id.name
class IrModelFieldsAnonymizationHistory(models.Model):
_name = 'ir.model.fields.anonymization.history'
_order = "date desc"
date = fields.Datetime(required=True, readonly=True)
field_ids = fields.Many2many(
'ir.model.fields.anonymization', 'anonymized_field_to_history_rel',
'field_id', 'history_id', string='Fields', readonly=True
)
state = fields.Selection(selection=ANONYMIZATION_HISTORY_STATE, string='Status', required=True, readonly=True)
direction = fields.Selection(selection=ANONYMIZATION_DIRECTION, required=True, readonly=True)
msg = fields.Text('Message', readonly=True)
filepath = fields.Char('File path', readonly=True)
class IrModelFieldsAnonymizationMigrationFix(models.Model):
_name = 'ir.model.fields.anonymization.migration.fix'
_order = "sequence"
target_version = fields.Char('Target Version')
model_name = fields.Char('Model')
field_name = fields.Char('Field')
query = fields.Text()
query_type = fields.Selection(selection=[('sql', 'sql'), ('python', 'python')], string='Query')
sequence = fields.Integer()
|