File: analytic_mixin.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (175 lines) | stat: -rw-r--r-- 9,382 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo import models, fields, api, _
from odoo.tools import SQL, Query, unique
from odoo.tools.float_utils import float_round, float_compare
from odoo.exceptions import UserError, ValidationError

class AnalyticMixin(models.AbstractModel):
    _name = 'analytic.mixin'
    _description = 'Analytic Mixin'

    analytic_distribution = fields.Json(
        'Analytic Distribution',
        compute="_compute_analytic_distribution", store=True, copy=True, readonly=False,
    )
    analytic_precision = fields.Integer(
        store=False,
        default=lambda self: self.env['decimal.precision'].precision_get("Percentage Analytic"),
    )
    distribution_analytic_account_ids = fields.Many2many(
        comodel_name='account.analytic.account',
        compute='_compute_distribution_analytic_account_ids',
    )

    def init(self):
        # Add a gin index for json search on the keys, on the models that actually have a table
        query = ''' SELECT table_name
                    FROM information_schema.tables
                    WHERE table_name=%s '''
        self.env.cr.execute(query, [self._table])
        if self.env.cr.dictfetchone() and self._fields['analytic_distribution'].store:
            query = fr"""
                CREATE INDEX IF NOT EXISTS {self._table}_analytic_distribution_accounts_gin_index
                                        ON {self._table} USING gin(regexp_split_to_array(jsonb_path_query_array(analytic_distribution, '$.keyvalue()."key"')::text, '\D+'));
            """
            self.env.cr.execute(query)
        super().init()

    def _compute_analytic_distribution(self):
        pass

    def _query_analytic_accounts(self, table=False):
        return SQL(
            r"""regexp_split_to_array(jsonb_path_query_array(%s, '$.keyvalue()."key"')::text, '\D+')""",
            self._field_to_sql(table or self._table, 'analytic_distribution'),
        )

    @api.depends('analytic_distribution')
    def _compute_distribution_analytic_account_ids(self):
        all_ids = {int(_id) for rec in self for key in (rec.analytic_distribution or {}) for _id in key.split(',')}
        existing_accounts_ids = set(self.env['account.analytic.account'].browse(all_ids).exists().ids)
        for rec in self:
            ids = list(unique(int(_id) for key in (rec.analytic_distribution or {}) for _id in key.split(',') if int(_id) in existing_accounts_ids))
            rec.distribution_analytic_account_ids = self.env['account.analytic.account'].browse(ids)

    def _condition_to_sql(self, alias: str, fname: str, operator: str, value, query: Query) -> SQL:
        # Don't use this override when account_report_analytic_groupby is truly in the context
        # Indeed, when account_report_analytic_groupby is in the context it means that `analytic_distribution`
        # doesn't have the same format and the table is a temporary one, see _prepare_lines_for_analytic_groupby
        if fname != 'analytic_distribution' or self.env.context.get('account_report_analytic_groupby'):
            return super()._condition_to_sql(alias, fname, operator, value, query)

        if operator not in ('=', '!=', 'ilike', 'not ilike', 'in', 'not in'):
            raise UserError(_('Operation not supported'))

        if operator in ('=', '!=') and isinstance(value, bool):
            return super()._condition_to_sql(alias, fname, operator, value, query)

        if isinstance(value, str) and operator in ('=', '!=', 'ilike', 'not ilike'):
            value = list(self.env['account.analytic.account']._search(
                [('display_name', '=' if operator in ('=', '!=') else 'ilike', value)]
            ))
            operator = 'in' if operator in ('=', 'ilike') else 'not in'

        # keys can be comma-separated ids, we will split those into an array and then make an array comparison with the list of ids to check
        analytic_accounts_query = self._query_analytic_accounts()
        value = [str(id_) for id_ in value if id_]  # list of ids -> list of string
        if operator == 'in':
            return SQL(
                "%s && %s",
                analytic_accounts_query,
                value,
            )
        if operator == 'not in':
            return SQL(
                "(NOT %s && %s OR %s IS NULL)",
                analytic_accounts_query,
                value,
                self._field_to_sql(alias, 'analytic_distribution', query),
            )
        raise UserError(_('Operation not supported'))

    def _read_group_groupby(self, groupby_spec: str, query: Query) -> SQL:
        """To group by `analytic_distribution`, we first need to separate the analytic_ids and associate them with the ids to be counted
        Do note that only '__count' can be passed in the `aggregates`"""
        if groupby_spec == 'analytic_distribution':
            query._tables = {
                'distribution': SQL(
                    r"""(SELECT DISTINCT %s, (regexp_matches(jsonb_object_keys(%s), '\d+', 'g'))[1]::int AS account_id FROM %s WHERE %s)""",
                    self._get_count_id(query),
                    self._field_to_sql(self._table, 'analytic_distribution', query),
                    query.from_clause,
                    query.where_clause,
                )
            }

            # After using the from and where clauses in the nested query, they are no longer needed in the main one
            query._joins = {}
            query._where_clauses = []
            return SQL("account_id")

        return super()._read_group_groupby(groupby_spec, query)

    def _read_group_select(self, aggregate_spec: str, query: Query) -> SQL:
        if query.table == 'distribution' and aggregate_spec != '__count':
            raise ValueError(f"analytic_distribution grouping does not accept {aggregate_spec} as aggregate.")
        return super()._read_group_select(aggregate_spec, query)

    def _get_count_id(self, query):
        ids = {
            'account_move_line': "move_id",
            'purchase_order_line': "order_id",
            'account_asset': "id",
            'hr_expense': "id",
        }
        if query.table not in ids:
            raise ValueError(f"{query.table} does not support analytic_distribution grouping.")
        return SQL(ids.get(query.table))

    def mapped(self, func):
        # Get the related analytic accounts as a recordset instead of the distribution
        if func == 'analytic_distribution' and self.env.context.get('distribution_ids'):
            return self.distribution_analytic_account_ids
        return super().mapped(func)

    def filtered_domain(self, domain):
        # Filter based on the accounts used (i.e. allowing a name_search) instead of the distribution
        # A domain on a binary field doesn't make sense anymore outside of set or not; and it is still doable.
        return super(AnalyticMixin, self.with_context(distribution_ids=True)).filtered_domain(domain)

    def write(self, vals):
        """ Format the analytic_distribution float value, so equality on analytic_distribution can be done """
        decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
        vals = self._sanitize_values(vals, decimal_precision)
        return super().write(vals)

    @api.model_create_multi
    def create(self, vals_list):
        """ Format the analytic_distribution float value, so equality on analytic_distribution can be done """
        decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
        vals_list = [self._sanitize_values(vals, decimal_precision) for vals in vals_list]
        return super().create(vals_list)

    def _validate_distribution(self, **kwargs):
        if self.env.context.get('validate_analytic', False):
            mandatory_plans_ids = [plan['id'] for plan in self.env['account.analytic.plan'].sudo().with_company(self.company_id).get_relevant_plans(**kwargs) if plan['applicability'] == 'mandatory']
            if not mandatory_plans_ids:
                return
            decimal_precision = self.env['decimal.precision'].precision_get('Percentage Analytic')
            distribution_by_root_plan = {}
            for analytic_account_ids, percentage in (self.analytic_distribution or {}).items():
                for analytic_account in self.env['account.analytic.account'].browse(map(int, analytic_account_ids.split(","))).exists():
                    root_plan = analytic_account.root_plan_id
                    distribution_by_root_plan[root_plan.id] = distribution_by_root_plan.get(root_plan.id, 0) + percentage

            for plan_id in mandatory_plans_ids:
                if float_compare(distribution_by_root_plan.get(plan_id, 0), 100, precision_digits=decimal_precision) != 0:
                    raise ValidationError(_("One or more lines require a 100% analytic distribution."))

    def _sanitize_values(self, vals, decimal_precision):
        """ Normalize the float of the distribution """
        if 'analytic_distribution' in vals:
            vals['analytic_distribution'] = vals.get('analytic_distribution') and {
                account_id: float_round(distribution, decimal_precision) for account_id, distribution in vals['analytic_distribution'].items()}
        return vals