1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import models, fields, api, _
from odoo.tools import SQL, Query, unique
from odoo.tools.float_utils import float_round, float_compare
from odoo.exceptions import UserError, ValidationError
class AnalyticMixin(models.AbstractModel):
_name = 'analytic.mixin'
_description = 'Analytic Mixin'
analytic_distribution = fields.Json(
'Analytic Distribution',
compute="_compute_analytic_distribution", store=True, copy=True, readonly=False,
)
analytic_precision = fields.Integer(
store=False,
default=lambda self: self.env['decimal.precision'].precision_get("Percentage Analytic"),
)
distribution_analytic_account_ids = fields.Many2many(
comodel_name='account.analytic.account',
compute='_compute_distribution_analytic_account_ids',
)
def init(self):
# Add a gin index for json search on the keys, on the models that actually have a table
query = ''' SELECT table_name
FROM information_schema.tables
WHERE table_name=%s '''
self.env.cr.execute(query, [self._table])
if self.env.cr.dictfetchone() and self._fields['analytic_distribution'].store:
query = fr"""
CREATE INDEX IF NOT EXISTS {self._table}_analytic_distribution_accounts_gin_index
ON {self._table} USING gin(regexp_split_to_array(jsonb_path_query_array(analytic_distribution, '$.keyvalue()."key"')::text, '\D+'));
"""
self.env.cr.execute(query)
super().init()
def _compute_analytic_distribution(self):
pass
def _query_analytic_accounts(self, table=False):
return SQL(
r"""regexp_split_to_array(jsonb_path_query_array(%s, '$.keyvalue()."key"')::text, '\D+')""",
self._field_to_sql(table or self._table, 'analytic_distribution'),
)
@api.depends('analytic_distribution')
def _compute_distribution_analytic_account_ids(self):
all_ids = {int(_id) for rec in self for key in (rec.analytic_distribution or {}) for _id in key.split(',')}
existing_accounts_ids = set(self.env['account.analytic.account'].browse(all_ids).exists().ids)
for rec in self:
ids = list(unique(int(_id) for key in (rec.analytic_distribution or {}) for _id in key.split(',') if int(_id) in existing_accounts_ids))
rec.distribution_analytic_account_ids = self.env['account.analytic.account'].browse(ids)
def _condition_to_sql(self, alias: str, fname: str, operator: str, value, query: Query) -> SQL:
# Don't use this override when account_report_analytic_groupby is truly in the context
# Indeed, when account_report_analytic_groupby is in the context it means that `analytic_distribution`
# doesn't have the same format and the table is a temporary one, see _prepare_lines_for_analytic_groupby
if fname != 'analytic_distribution' or self.env.context.get('account_report_analytic_groupby'):
return super()._condition_to_sql(alias, fname, operator, value, query)
if operator not in ('=', '!=', 'ilike', 'not ilike', 'in', 'not in'):
raise UserError(_('Operation not supported'))
if operator in ('=', '!=') and isinstance(value, bool):
return super()._condition_to_sql(alias, fname, operator, value, query)
if isinstance(value, str) and operator in ('=', '!=', 'ilike', 'not ilike'):
value = list(self.env['account.analytic.account']._search(
[('display_name', '=' if operator in ('=', '!=') else 'ilike', value)]
))
operator = 'in' if operator in ('=', 'ilike') else 'not in'
# keys can be comma-separated ids, we will split those into an array and then make an array comparison with the list of ids to check
analytic_accounts_query = self._query_analytic_accounts()
value = [str(id_) for id_ in value if id_] # list of ids -> list of string
if operator == 'in':
return SQL(
"%s && %s",
analytic_accounts_query,
value,
)
if operator == 'not in':
return SQL(
"(NOT %s && %s OR %s IS NULL)",
analytic_accounts_query,
value,
self._field_to_sql(alias, 'analytic_distribution', query),
)
raise UserError(_('Operation not supported'))
def _read_group_groupby(self, groupby_spec: str, query: Query) -> SQL:
"""To group by `analytic_distribution`, we first need to separate the analytic_ids and associate them with the ids to be counted
Do note that only '__count' can be passed in the `aggregates`"""
if groupby_spec == 'analytic_distribution':
query._tables = {
'distribution': SQL(
r"""(SELECT DISTINCT %s, (regexp_matches(jsonb_object_keys(%s), '\d+', 'g'))[1]::int AS account_id FROM %s WHERE %s)""",
self._get_count_id(query),
self._field_to_sql(self._table, 'analytic_distribution', query),
query.from_clause,
query.where_clause,
)
}
# After using the from and where clauses in the nested query, they are no longer needed in the main one
query._joins = {}
query._where_clauses = []
return SQL("account_id")
return super()._read_group_groupby(groupby_spec, query)
def _read_group_select(self, aggregate_spec: str, query: Query) -> SQL:
if query.table == 'distribution' and aggregate_spec != '__count':
raise ValueError(f"analytic_distribution grouping does not accept {aggregate_spec} as aggregate.")
return super()._read_group_select(aggregate_spec, query)
def _get_count_id(self, query):
ids = {
'account_move_line': "move_id",
'purchase_order_line': "order_id",
'account_asset': "id",
'hr_expense': "id",
}
if query.table not in ids:
raise ValueError(f"{query.table} does not support analytic_distribution grouping.")
return SQL(ids.get(query.table))
def mapped(self, func):
# Get the related analytic accounts as a recordset instead of the distribution
if func == 'analytic_distribution' and self.env.context.get('distribution_ids'):
return self.distribution_analytic_account_ids
return super().mapped(func)
def filtered_domain(self, domain):
# Filter based on the accounts used (i.e. allowing a name_search) instead of the distribution
# A domain on a binary field doesn't make sense anymore outside of set or not; and it is still doable.
return super(AnalyticMixin, self.with_context(distribution_ids=True)).filtered_domain(domain)
def write(self, vals):
""" Format the analytic_distribution float value, so equality on analytic_distribution can be done """
decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
vals = self._sanitize_values(vals, decimal_precision)
return super().write(vals)
@api.model_create_multi
def create(self, vals_list):
""" Format the analytic_distribution float value, so equality on analytic_distribution can be done """
decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
vals_list = [self._sanitize_values(vals, decimal_precision) for vals in vals_list]
return super().create(vals_list)
def _validate_distribution(self, **kwargs):
if self.env.context.get('validate_analytic', False):
mandatory_plans_ids = [plan['id'] for plan in self.env['account.analytic.plan'].sudo().with_company(self.company_id).get_relevant_plans(**kwargs) if plan['applicability'] == 'mandatory']
if not mandatory_plans_ids:
return
decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
distribution_by_root_plan = {}
for analytic_account_ids, percentage in (self.analytic_distribution or {}).items():
for analytic_account in self.env['account.analytic.account'].browse(map(int, analytic_account_ids.split(","))).exists():
root_plan = analytic_account.root_plan_id
distribution_by_root_plan[root_plan.id] = distribution_by_root_plan.get(root_plan.id, 0) + percentage
for plan_id in mandatory_plans_ids:
if float_compare(distribution_by_root_plan.get(plan_id, 0), 100, precision_digits=decimal_precision) != 0:
raise ValidationError(_("One or more lines require a 100% analytic distribution."))
def _sanitize_values(self, vals, decimal_precision):
""" Normalize the float of the distribution """
if 'analytic_distribution' in vals:
vals['analytic_distribution'] = vals.get('analytic_distribution') and {
account_id: float_round(distribution, decimal_precision) for account_id, distribution in vals['analytic_distribution'].items()}
return vals
|